diff --git a/src/aggregation/bucket/term_agg.rs b/src/aggregation/bucket/term_agg.rs index 7b33529509..fd04f3ed2b 100644 --- a/src/aggregation/bucket/term_agg.rs +++ b/src/aggregation/bucket/term_agg.rs @@ -554,6 +554,9 @@ impl SegmentTermCollector { if self.req.min_doc_count == 0 { // TODO: Handle rev streaming for descending sorting by keys let mut stream = term_dict.dictionary().stream()?; + let empty_sub_aggregation = IntermediateAggregationResults::empty_from_req( + agg_with_accessor.agg.sub_aggregation(), + ); while let Some((key, _ord)) = stream.next() { if dict.len() >= self.req.segment_size as usize { break; @@ -564,7 +567,12 @@ impl SegmentTermCollector { .map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))? .to_string(), ); - dict.entry(key).or_default(); + + dict.entry(key.clone()) + .or_insert_with(|| IntermediateTermBucketEntry { + doc_count: 0, + sub_aggregation: empty_sub_aggregation.clone(), + }); } } } else { @@ -625,6 +633,9 @@ mod tests { get_test_index_from_terms, get_test_index_from_values_and_terms, }; use crate::aggregation::AggregationLimits; + use crate::indexer::NoMergePolicy; + use crate::schema::{Schema, FAST, STRING}; + use crate::Index; #[test] fn terms_aggregation_test_single_segment() -> crate::Result<()> { @@ -1215,6 +1226,85 @@ mod tests { Ok(()) } + #[test] + fn terms_aggregation_min_doc_count_special_case_with_sub_agg_empty_merge() -> crate::Result<()> + { + let mut schema_builder = Schema::builder(); + let string_field_1 = schema_builder.add_text_field("string1", STRING | FAST); + let string_field_2 = schema_builder.add_text_field("string2", STRING | FAST); + let score_fieldtype = crate::schema::NumericOptions::default().set_fast(); + let score_field = schema_builder.add_u64_field("score", score_fieldtype); + let index = Index::create_in_ram(schema_builder.build()); + { + let mut index_writer = index.writer_with_num_threads(1, 20_000_000)?; + index_writer.set_merge_policy(Box::new(NoMergePolicy)); + // writing the segment + index_writer.add_document(doc!( + string_field_1 => "A".to_string(), + string_field_2 => "hit".to_string(), + score_field => 1u64, + ))?; + index_writer.add_document(doc!( + string_field_1 => "B".to_string(), + string_field_2 => "nohit".to_string(), // this doc gets filtered in this segment, + // but the term will still be loaded because + // min_doc_count == 0 + score_field => 2u64, + ))?; + index_writer.commit()?; + + index_writer.add_document(doc!( + string_field_1 => "A".to_string(), + string_field_2 => "hit".to_string(), + score_field => 2u64, + ))?; + index_writer.add_document(doc!( + string_field_1 => "B".to_string(), + string_field_2 => "hit".to_string(), + score_field => 4u64, + ))?; + index_writer.commit()?; + } + + let agg_req: Aggregations = serde_json::from_value(json!({ + "my_texts": { + "terms": { + "field": "string1", + "min_doc_count": 0, + }, + "aggs":{ + "elhistogram": { + "histogram": { + "field": "score", + "interval": 1 + } + } + } + } + })) + .unwrap(); + + // searching for terma, but min_doc_count will return all terms + let res = exec_request_with_query(agg_req, &index, Some(("string2", "hit")))?; + + assert_eq!(res["my_texts"]["buckets"][0]["key"], "A"); + assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 2); + assert_eq!( + res["my_texts"]["buckets"][0]["elhistogram"]["buckets"], + json!([{ "doc_count": 1, "key": 1.0 }, { "doc_count": 1, "key": 2.0 } ]) + ); + assert_eq!(res["my_texts"]["buckets"][1]["key"], "B"); + assert_eq!(res["my_texts"]["buckets"][1]["doc_count"], 1); + assert_eq!( + res["my_texts"]["buckets"][1]["elhistogram"]["buckets"], + json!([ { "doc_count": 1, "key": 4.0 } ]) + ); + assert_eq!(res["my_texts"]["sum_other_doc_count"], 0); + assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0); + + Ok(()) + } + #[test] fn terms_aggregation_error_count_test() -> crate::Result<()> { let terms_per_segment = vec![