Skip to content

Commit

Permalink
Fix min doc_count empty merge bug
Browse files Browse the repository at this point in the history
This fixes an issue when min_doc==0 loads terms from the dictionary from
one segment and merges the same term with a subaggregation from another
segment.
Previously the empty structure was not correctly initialized to contain
the subaggregation so the merge was incorrect.
  • Loading branch information
PSeitz committed May 29, 2023
1 parent e56addc commit a764d49
Showing 1 changed file with 91 additions and 1 deletion.
92 changes: 91 additions & 1 deletion src/aggregation/bucket/term_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,9 @@ impl SegmentTermCollector {
if self.req.min_doc_count == 0 {
// TODO: Handle rev streaming for descending sorting by keys
let mut stream = term_dict.dictionary().stream()?;
let empty_sub_aggregation = IntermediateAggregationResults::empty_from_req(
agg_with_accessor.agg.sub_aggregation(),
);
while let Some((key, _ord)) = stream.next() {
if dict.len() >= self.req.segment_size as usize {
break;
Expand All @@ -564,7 +567,12 @@ impl SegmentTermCollector {
.map_err(|utf8_err| DataCorruption::comment_only(utf8_err.to_string()))?
.to_string(),
);
dict.entry(key).or_default();

dict.entry(key.clone())
.or_insert_with(|| IntermediateTermBucketEntry {
doc_count: 0,
sub_aggregation: empty_sub_aggregation.clone(),
});
}
}
} else {
Expand Down Expand Up @@ -625,6 +633,9 @@ mod tests {
get_test_index_from_terms, get_test_index_from_values_and_terms,
};
use crate::aggregation::AggregationLimits;
use crate::indexer::NoMergePolicy;
use crate::schema::{Schema, FAST, STRING};
use crate::Index;

#[test]
fn terms_aggregation_test_single_segment() -> crate::Result<()> {
Expand Down Expand Up @@ -1215,6 +1226,85 @@ mod tests {
Ok(())
}

#[test]
fn terms_aggregation_min_doc_count_special_case_with_sub_agg_empty_merge() -> crate::Result<()>
{
let mut schema_builder = Schema::builder();
let string_field_1 = schema_builder.add_text_field("string1", STRING | FAST);
let string_field_2 = schema_builder.add_text_field("string2", STRING | FAST);
let score_fieldtype = crate::schema::NumericOptions::default().set_fast();
let score_field = schema_builder.add_u64_field("score", score_fieldtype);
let index = Index::create_in_ram(schema_builder.build());
{
let mut index_writer = index.writer_with_num_threads(1, 20_000_000)?;
index_writer.set_merge_policy(Box::new(NoMergePolicy));
// writing the segment
index_writer.add_document(doc!(
string_field_1 => "A".to_string(),
string_field_2 => "hit".to_string(),
score_field => 1u64,
))?;
index_writer.add_document(doc!(
string_field_1 => "B".to_string(),
string_field_2 => "nohit".to_string(), // this doc gets filtered in this segment,
// but the term will still be loaded because
// min_doc_count == 0
score_field => 2u64,
))?;
index_writer.commit()?;

index_writer.add_document(doc!(
string_field_1 => "A".to_string(),
string_field_2 => "hit".to_string(),
score_field => 2u64,
))?;
index_writer.add_document(doc!(
string_field_1 => "B".to_string(),
string_field_2 => "hit".to_string(),
score_field => 4u64,
))?;
index_writer.commit()?;
}

let agg_req: Aggregations = serde_json::from_value(json!({
"my_texts": {
"terms": {
"field": "string1",
"min_doc_count": 0,
},
"aggs":{
"elhistogram": {
"histogram": {
"field": "score",
"interval": 1
}
}
}
}
}))
.unwrap();

// searching for terma, but min_doc_count will return all terms
let res = exec_request_with_query(agg_req, &index, Some(("string2", "hit")))?;

assert_eq!(res["my_texts"]["buckets"][0]["key"], "A");
assert_eq!(res["my_texts"]["buckets"][0]["doc_count"], 2);
assert_eq!(
res["my_texts"]["buckets"][0]["elhistogram"]["buckets"],
json!([{ "doc_count": 1, "key": 1.0 }, { "doc_count": 1, "key": 2.0 } ])
);
assert_eq!(res["my_texts"]["buckets"][1]["key"], "B");
assert_eq!(res["my_texts"]["buckets"][1]["doc_count"], 1);
assert_eq!(
res["my_texts"]["buckets"][1]["elhistogram"]["buckets"],
json!([ { "doc_count": 1, "key": 4.0 } ])
);
assert_eq!(res["my_texts"]["sum_other_doc_count"], 0);
assert_eq!(res["my_texts"]["doc_count_error_upper_bound"], 0);

Ok(())
}

#[test]
fn terms_aggregation_error_count_test() -> crate::Result<()> {
let terms_per_segment = vec![
Expand Down

0 comments on commit a764d49

Please sign in to comment.