diff --git a/src/core/tests.rs b/src/core/tests.rs index f2a26e0382..e215c31f46 100644 --- a/src/core/tests.rs +++ b/src/core/tests.rs @@ -1,12 +1,13 @@ use crate::collector::Count; use crate::directory::{RamDirectory, WatchCallback}; -use crate::indexer::NoMergePolicy; +use crate::indexer::{LogMergePolicy, NoMergePolicy}; +use crate::json_utils::JsonTermWriter; use crate::query::TermQuery; -use crate::schema::{Field, IndexRecordOption, Schema, INDEXED, STRING, TEXT}; +use crate::schema::{Field, IndexRecordOption, Schema, Type, INDEXED, STRING, TEXT}; use crate::tokenizer::TokenizerManager; use crate::{ - Directory, Index, IndexBuilder, IndexReader, IndexSettings, IndexWriter, ReloadPolicy, - SegmentId, TantivyDocument, Term, + Directory, DocSet, Index, IndexBuilder, IndexReader, IndexSettings, IndexWriter, Postings, + ReloadPolicy, SegmentId, TantivyDocument, Term, }; #[test] @@ -344,3 +345,132 @@ fn test_merging_segment_update_docfreq() { let term_info = inv_index.get_term_info(&term).unwrap().unwrap(); assert_eq!(term_info.doc_freq, 12); } + +// motivated by https://github.com/quickwit-oss/quickwit/issues/4130 +#[test] +fn test_positions_merge_bug_non_text_json_vint() { + let mut schema_builder = Schema::builder(); + let field = schema_builder.add_json_field("dynamic", TEXT); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema.clone()); + let mut writer: IndexWriter = index.writer_for_tests().unwrap(); + let mut merge_policy = LogMergePolicy::default(); + merge_policy.set_min_num_segments(2); + writer.set_merge_policy(Box::new(merge_policy)); + // Here a string would work. + let doc_json = r#"{"tenant_id":75}"#; + let vals = serde_json::from_str(doc_json).unwrap(); + let mut doc = TantivyDocument::default(); + doc.add_object(field, vals); + writer.add_document(doc.clone()).unwrap(); + writer.commit().unwrap(); + writer.add_document(doc.clone()).unwrap(); + writer.commit().unwrap(); + writer.wait_merging_threads().unwrap(); + let reader = index.reader().unwrap(); + assert_eq!(reader.searcher().segment_readers().len(), 1); +} + +// Same as above but with bitpacked blocks +#[test] +fn test_positions_merge_bug_non_text_json_bitpacked_block() { + let mut schema_builder = Schema::builder(); + let field = schema_builder.add_json_field("dynamic", TEXT); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema.clone()); + let mut writer: IndexWriter = index.writer_for_tests().unwrap(); + let mut merge_policy = LogMergePolicy::default(); + merge_policy.set_min_num_segments(2); + writer.set_merge_policy(Box::new(merge_policy)); + // Here a string would work. + let doc_json = r#"{"tenant_id":75}"#; + let vals = serde_json::from_str(doc_json).unwrap(); + let mut doc = TantivyDocument::default(); + doc.add_object(field, vals); + for _ in 0..128 { + writer.add_document(doc.clone()).unwrap(); + } + writer.commit().unwrap(); + writer.add_document(doc.clone()).unwrap(); + writer.commit().unwrap(); + writer.wait_merging_threads().unwrap(); + let reader = index.reader().unwrap(); + assert_eq!(reader.searcher().segment_readers().len(), 1); +} + +#[test] +fn test_non_text_json_term_freq() { + let mut schema_builder = Schema::builder(); + let field = schema_builder.add_json_field("dynamic", TEXT); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema.clone()); + let mut writer: IndexWriter = index.writer_for_tests().unwrap(); + // Here a string would work. + let doc_json = r#"{"tenant_id":75}"#; + let vals = serde_json::from_str(doc_json).unwrap(); + let mut doc = TantivyDocument::default(); + doc.add_object(field, vals); + writer.add_document(doc.clone()).unwrap(); + writer.commit().unwrap(); + let reader = index.reader().unwrap(); + assert_eq!(reader.searcher().segment_readers().len(), 1); + let searcher = reader.searcher(); + let segment_reader = searcher.segment_reader(0u32); + let inv_idx = segment_reader.inverted_index(field).unwrap(); + let mut term = Term::with_type_and_field(Type::Json, field); + let mut json_term_writer = JsonTermWriter::wrap(&mut term, false); + json_term_writer.push_path_segment("tenant_id"); + json_term_writer.close_path_and_set_type(Type::U64); + json_term_writer.set_fast_value(75u64); + let postings = inv_idx + .read_postings( + &json_term_writer.term(), + IndexRecordOption::WithFreqsAndPositions, + ) + .unwrap() + .unwrap(); + assert_eq!(postings.doc(), 0); + assert_eq!(postings.term_freq(), 1u32); +} + +#[test] +fn test_non_text_json_term_freq_bitpacked() { + let mut schema_builder = Schema::builder(); + let field = schema_builder.add_json_field("dynamic", TEXT); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema.clone()); + let mut writer: IndexWriter = index.writer_for_tests().unwrap(); + // Here a string would work. + let doc_json = r#"{"tenant_id":75}"#; + let vals = serde_json::from_str(doc_json).unwrap(); + let mut doc = TantivyDocument::default(); + doc.add_object(field, vals); + let num_docs = 132; + for _ in 0..num_docs { + writer.add_document(doc.clone()).unwrap(); + } + writer.commit().unwrap(); + let reader = index.reader().unwrap(); + assert_eq!(reader.searcher().segment_readers().len(), 1); + let searcher = reader.searcher(); + let segment_reader = searcher.segment_reader(0u32); + let inv_idx = segment_reader.inverted_index(field).unwrap(); + let mut term = Term::with_type_and_field(Type::Json, field); + let mut json_term_writer = JsonTermWriter::wrap(&mut term, false); + json_term_writer.push_path_segment("tenant_id"); + json_term_writer.close_path_and_set_type(Type::U64); + json_term_writer.set_fast_value(75u64); + let mut postings = inv_idx + .read_postings( + &json_term_writer.term(), + IndexRecordOption::WithFreqsAndPositions, + ) + .unwrap() + .unwrap(); + assert_eq!(postings.doc(), 0); + assert_eq!(postings.term_freq(), 1u32); + for i in 1..num_docs { + assert_eq!(postings.advance(), i); + assert_eq!(postings.term_freq(), 1u32); + } +} diff --git a/src/indexer/merger.rs b/src/indexer/merger.rs index 6c7837a497..87bc4c8c8c 100644 --- a/src/indexer/merger.rs +++ b/src/indexer/merger.rs @@ -552,7 +552,41 @@ impl IndexMerger { continue; } - field_serializer.new_term(term_bytes, total_doc_freq)?; + // This should never happen as we early exited for total_doc_freq == 0. + assert!(!segment_postings_containing_the_term.is_empty()); + + let has_term_freq = { + let has_term_freq = !segment_postings_containing_the_term[0] + .1 + .block_cursor + .freqs() + .is_empty(); + for (_, postings) in &segment_postings_containing_the_term[1..] { + // This may look at a strange way to test whether we have term freq or not. + // With JSON object, the schema is not sufficient to know whether a term + // has its term frequency encoded or not: + // strings may have term frequencies, while number terms never have one. + // + // Ideally, we should have burnt one bit of two in the `TermInfo`. + // However, we preferred not changing the codec too much and detect this + // instead by + // - looking at the size of the skip data for bitpacked blocks + // - observing the absence of remaining data after reading the docs for vint + // blocks. + // + // Overall the reliable way to know if we have actual frequencies loaded or not + // is to check whether the actual decoded array is empty or not. + if has_term_freq != !postings.block_cursor.freqs().is_empty() { + return Err(DataCorruption::comment_only( + "Term freqs are inconsistent across segments", + ) + .into()); + } + } + has_term_freq + }; + + field_serializer.new_term(term_bytes, total_doc_freq, has_term_freq)?; // We can now serialize this postings, by pushing each document to the // postings serializer. @@ -567,8 +601,13 @@ impl IndexMerger { if let Some(remapped_doc_id) = old_to_new_doc_id[doc as usize] { // we make sure to only write the term if // there is at least one document. - let term_freq = segment_postings.term_freq(); - segment_postings.positions(&mut positions_buffer); + let term_freq = if has_term_freq { + segment_postings.positions(&mut positions_buffer); + segment_postings.term_freq() + } else { + 0u32 + }; + // if doc_id_mapping exists, the doc_ids are reordered, they are // not just stacked. The field serializer expects monotonically increasing // doc_ids, so we collect and sort them first, before writing. diff --git a/src/postings/json_postings_writer.rs b/src/postings/json_postings_writer.rs index db337f4587..9f0d8eb06f 100644 --- a/src/postings/json_postings_writer.rs +++ b/src/postings/json_postings_writer.rs @@ -11,6 +11,10 @@ use crate::schema::{Field, Type, JSON_END_OF_PATH}; use crate::tokenizer::TokenStream; use crate::{DocId, Term}; +/// The `JsonPostingsWriter` is odd in that it relies on a hidden contract: +/// +/// `subscribe` is called directly to index non-text tokens, while +/// `index_text` is used to index text. #[derive(Default)] pub(crate) struct JsonPostingsWriter { str_posting_writer: SpecializedPostingsWriter, diff --git a/src/postings/mod.rs b/src/postings/mod.rs index 53f51ad2a5..32c4b7bd89 100644 --- a/src/postings/mod.rs +++ b/src/postings/mod.rs @@ -63,7 +63,7 @@ pub mod tests { let mut segment = index.new_segment(); let mut posting_serializer = InvertedIndexSerializer::open(&mut segment)?; let mut field_serializer = posting_serializer.new_field(text_field, 120 * 4, None)?; - field_serializer.new_term("abc".as_bytes(), 12u32)?; + field_serializer.new_term("abc".as_bytes(), 12u32, true)?; for doc_id in 0u32..120u32 { let delta_positions = vec![1, 2, 3, 2]; field_serializer.write_doc(doc_id, 4, &delta_positions); diff --git a/src/postings/postings_writer.rs b/src/postings/postings_writer.rs index c51d4d834b..943bc11b55 100644 --- a/src/postings/postings_writer.rs +++ b/src/postings/postings_writer.rs @@ -194,7 +194,11 @@ impl SpecializedPostingsWriter { ) -> io::Result<()> { let recorder: Rec = ctx.term_index.read(addr); let term_doc_freq = recorder.term_doc_freq().unwrap_or(0u32); - serializer.new_term(term, term_doc_freq)?; + serializer.new_term( + term, + term_doc_freq, + recorder.has_term_freq(), + )?; recorder.serialize(&ctx.arena, doc_id_map, serializer, buffer_lender); serializer.close_term()?; Ok(()) diff --git a/src/postings/recorder.rs b/src/postings/recorder.rs index 9620f155b6..767441f641 100644 --- a/src/postings/recorder.rs +++ b/src/postings/recorder.rs @@ -79,6 +79,11 @@ pub(crate) trait Recorder: Copy + Default + Send + Sync + 'static { /// /// Returns `None` if not available. fn term_doc_freq(&self) -> Option; + + #[inline] + fn has_term_freq(&self) -> bool { + true + } } /// Only records the doc ids @@ -136,6 +141,10 @@ impl Recorder for DocIdRecorder { fn term_doc_freq(&self) -> Option { None } + + fn has_term_freq(&self) -> bool { + false + } } /// Takes an Iterator of delta encoded elements and returns an iterator diff --git a/src/postings/segment_postings.rs b/src/postings/segment_postings.rs index 2b9ee8c5c9..3d91cf2ee2 100644 --- a/src/postings/segment_postings.rs +++ b/src/postings/segment_postings.rs @@ -71,7 +71,7 @@ impl SegmentPostings { { let mut postings_serializer = PostingsSerializer::new(&mut buffer, 0.0, IndexRecordOption::Basic, None); - postings_serializer.new_term(docs.len() as u32); + postings_serializer.new_term(docs.len() as u32, false); for &doc in docs { postings_serializer.write_doc(doc, 1u32); } @@ -120,7 +120,7 @@ impl SegmentPostings { IndexRecordOption::WithFreqs, fieldnorm_reader, ); - postings_serializer.new_term(doc_and_tfs.len() as u32); + postings_serializer.new_term(doc_and_tfs.len() as u32, true); for &(doc, tf) in doc_and_tfs { postings_serializer.write_doc(doc, tf); } @@ -238,14 +238,18 @@ impl Postings for SegmentPostings { } fn positions_with_offset(&mut self, offset: u32, output: &mut Vec) { - let term_freq = self.term_freq() as usize; + let term_freq = self.term_freq(); if let Some(position_reader) = self.position_reader.as_mut() { + debug_assert!( + !self.block_cursor.freqs().is_empty(), + "No positions available" + ); let read_offset = self.block_cursor.position_offset() + (self.block_cursor.freqs()[..self.cur] .iter() .cloned() .sum::() as u64); - output.resize(term_freq, 0u32); + output.resize(term_freq as usize, 0u32); position_reader.read(read_offset, &mut output[..]); let mut cum = offset; for output_mut in output.iter_mut() { diff --git a/src/postings/serializer.rs b/src/postings/serializer.rs index bf0d4d2ef3..f433e8ed19 100644 --- a/src/postings/serializer.rs +++ b/src/postings/serializer.rs @@ -168,7 +168,12 @@ impl<'a> FieldSerializer<'a> { /// * term - the term. It needs to come after the previous term according to the lexicographical /// order. /// * term_doc_freq - return the number of document containing the term. - pub fn new_term(&mut self, term: &[u8], term_doc_freq: u32) -> io::Result<()> { + pub fn new_term( + &mut self, + term: &[u8], + term_doc_freq: u32, + record_term_freq: bool, + ) -> io::Result<()> { assert!( !self.term_open, "Called new_term, while the previous term was not closed." @@ -177,7 +182,8 @@ impl<'a> FieldSerializer<'a> { self.postings_serializer.clear(); self.current_term_info = self.current_term_info(); self.term_dictionary_builder.insert_key(term)?; - self.postings_serializer.new_term(term_doc_freq); + self.postings_serializer + .new_term(term_doc_freq, record_term_freq); Ok(()) } @@ -330,10 +336,10 @@ impl PostingsSerializer { } } - pub fn new_term(&mut self, term_doc_freq: u32) { + pub fn new_term(&mut self, term_doc_freq: u32, record_term_freq: bool) { self.bm25_weight = None; - self.term_has_freq = self.mode.has_freq() && term_doc_freq != 0; + self.term_has_freq = self.mode.has_freq() && record_term_freq; if !self.term_has_freq { return; }