Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change InternalSignificantTerms to only sum shard level counts in final reduce #8735

Merged
merged 1 commit into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Changed
- Perform aggregation postCollection in ContextIndexSearcher after searching leaves ([#8303](https://github.com/opensearch-project/OpenSearch/pull/8303))
- Make Span exporter configurable ([#8620](https://github.com/opensearch-project/OpenSearch/issues/8620))
- Change InternalSignificantTerms to sum shard-level superset counts only in final reduce ([#8735](https://github.com/opensearch-project/OpenSearch/pull/8735))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

package org.opensearch.search.aggregations.bucket;

import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.index.IndexRequestBuilder;
import org.opensearch.action.search.SearchRequestBuilder;
import org.opensearch.action.search.SearchResponse;
Expand All @@ -42,6 +43,7 @@
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.index.query.TermQueryBuilder;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.script.MockScriptPlugin;
Expand Down Expand Up @@ -210,6 +212,34 @@ public void testXContentResponse() throws Exception {

}

public void testConsistencyWithDifferentShardCounts() throws Exception {
// The purpose of this test is to validate that the aggregation results do not change with shard count.
// bg_count for significant term agg is summed up across shards, so in this test we compare a 1 shard and 2 shard search request
String type = randomBoolean() ? "text" : "long";
String settings = "{\"index.number_of_shards\": 1, \"index.number_of_replicas\": 0}";
SharedSignificantTermsTestMethods.index01Docs(type, settings, this);

SearchRequestBuilder request = client().prepareSearch(INDEX_NAME)
.setQuery(new TermQueryBuilder(CLASS_FIELD, "0"))
.addAggregation((significantTerms("sig_terms").field(TEXT_FIELD)));

SearchResponse response1 = request.get();

assertAcked(client().admin().indices().delete(new DeleteIndexRequest("*")).get());

settings = "{\"index.number_of_shards\": 2, \"index.number_of_replicas\": 0}";
// We use a custom routing strategy here to ensure that each shard will have at least 1 bucket.
// If there are no buckets collected for a shard, then that will affect the scoring and bg_count and our assertion will not be
// valid.
SharedSignificantTermsTestMethods.index01DocsWithRouting(type, settings, this);
SearchResponse response2 = request.get();

assertEquals(
response1.getAggregations().asMap().get("sig_terms").toString(),
response2.getAggregations().asMap().get("sig_terms").toString()
);
}

public void testPopularTermManyDeletedDocs() throws Exception {
String settings = "{\"index.number_of_shards\": 1, \"index.number_of_replicas\": 0}";
assertAcked(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -917,8 +917,10 @@ public ReaderContext readerContext() {
}

@Override
public InternalAggregation.ReduceContext partial() {
return requestToAggReduceContextBuilder.apply(request.source()).forPartialReduction();
public InternalAggregation.ReduceContext partialOnShard() {
InternalAggregation.ReduceContext rc = requestToAggReduceContextBuilder.apply(request.source()).forPartialReduction();
rc.setSliceLevel(isConcurrentSegmentSearchEnabled());
return rc;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IO
// using reduce is fine here instead of topLevelReduce as pipeline aggregation is evaluated on the coordinator after all
// documents are collected across shards for an aggregation
return new AggregationReduceableSearchResult(
InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partial())
InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partialOnShard())
);
} else {
return new AggregationReduceableSearchResult(internalAggregations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ public static class ReduceContext {
private final ScriptService scriptService;
private final IntConsumer multiBucketConsumer;
private final PipelineTree pipelineTreeRoot;

private boolean isSliceLevel;
/**
* Supplies the pipelines when the result of the reduce is serialized
* to node versions that need pipeline aggregators to be serialized
Expand Down Expand Up @@ -138,6 +140,7 @@ private ReduceContext(
this.multiBucketConsumer = multiBucketConsumer;
this.pipelineTreeRoot = pipelineTreeRoot;
this.pipelineTreeForBwcSerialization = pipelineTreeForBwcSerialization;
this.isSliceLevel = false;
}

/**
Expand All @@ -149,6 +152,14 @@ public boolean isFinalReduce() {
return pipelineTreeRoot != null;
}

public void setSliceLevel(boolean sliceLevel) {
this.isSliceLevel = sliceLevel;
}

public boolean isSliceLevel() {
return this.isSliceLevel;
}

public BigArrays bigArrays() {
return bigArrays;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,13 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Reduce
@SuppressWarnings("unchecked")
InternalSignificantTerms<A, B> terms = (InternalSignificantTerms<A, B>) aggregation;
globalSubsetSize += terms.getSubsetSize();
globalSupersetSize += terms.getSupersetSize();
// supersetSize is a shard level count, if we sum it across slices we would produce num_slices_with_bucket * supersetSize where
// num_slices_with_bucket is the number of segment slices that have collected a bucket for the key
if (reduceContext.isSliceLevel()) {
globalSupersetSize = terms.getSupersetSize();
} else {
globalSupersetSize += terms.getSupersetSize();
}
}
Map<String, List<B>> buckets = new HashMap<>();
for (InternalAggregation aggregation : aggregations) {
Expand Down Expand Up @@ -291,7 +297,13 @@ protected B reduceBucket(List<B> buckets, ReduceContext context) {
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
for (B bucket : buckets) {
subsetDf += bucket.subsetDf;
supersetDf += bucket.supersetDf;
// supersetDf is a shard level count, if we sum it across slices we would produce num_slices_with_bucket * supersetSize where
// num_slices_with_bucket is the number of segment slices that have collected a bucket for the key
if (context.isSliceLevel()) {
supersetDf = bucket.supersetDf;
} else {
supersetDf += bucket.supersetDf;
}
aggregationsList.add(bucket.aggregations);
}
InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,8 @@ public ReaderContext readerContext() {
}

@Override
public InternalAggregation.ReduceContext partial() {
return in.partial();
public InternalAggregation.ReduceContext partialOnShard() {
return in.partialOnShard();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ public String toString() {

public abstract ReaderContext readerContext();

public abstract InternalAggregation.ReduceContext partial();
public abstract InternalAggregation.ReduceContext partialOnShard();

// processor used for bucket collectors
public abstract void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollectorProcessor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,4 +113,41 @@ public static void index01Docs(String type, String settings, OpenSearchIntegTest
indexRequestBuilderList.add(client().prepareIndex(INDEX_NAME).setId("7").setSource(TEXT_FIELD, "0", CLASS_FIELD, "0"));
testCase.indexRandom(true, false, indexRequestBuilderList);
}

public static void index01DocsWithRouting(String type, String settings, OpenSearchIntegTestCase testCase) throws ExecutionException,
InterruptedException {
String textMappings = "type=" + type;
if (type.equals("text")) {
textMappings += ",fielddata=true";
}
assertAcked(
testCase.prepareCreate(INDEX_NAME)
.setSettings(settings, XContentType.JSON)
.setMapping("text", textMappings, CLASS_FIELD, "type=keyword")
);
String[] gb = { "0", "1" };
List<IndexRequestBuilder> indexRequestBuilderList = new ArrayList<>();
indexRequestBuilderList.add(
client().prepareIndex(INDEX_NAME).setId("1").setSource(TEXT_FIELD, "1", CLASS_FIELD, "1").setRouting("0")
);
indexRequestBuilderList.add(
client().prepareIndex(INDEX_NAME).setId("2").setSource(TEXT_FIELD, "1", CLASS_FIELD, "1").setRouting("0")
);
indexRequestBuilderList.add(
client().prepareIndex(INDEX_NAME).setId("3").setSource(TEXT_FIELD, "0", CLASS_FIELD, "0").setRouting("0")
);
indexRequestBuilderList.add(
client().prepareIndex(INDEX_NAME).setId("4").setSource(TEXT_FIELD, "0", CLASS_FIELD, "0").setRouting("1")
);
indexRequestBuilderList.add(
client().prepareIndex(INDEX_NAME).setId("5").setSource(TEXT_FIELD, gb, CLASS_FIELD, "1").setRouting("1")
);
indexRequestBuilderList.add(
client().prepareIndex(INDEX_NAME).setId("6").setSource(TEXT_FIELD, gb, CLASS_FIELD, "0").setRouting("0")
);
indexRequestBuilderList.add(
client().prepareIndex(INDEX_NAME).setId("7").setSource(TEXT_FIELD, "0", CLASS_FIELD, "0").setRouting("0")
);
testCase.indexRandom(true, false, indexRequestBuilderList);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ public ReaderContext readerContext() {
}

@Override
public InternalAggregation.ReduceContext partial() {
public InternalAggregation.ReduceContext partialOnShard() {
return InternalAggregationTestCase.emptyReduceContextBuilder().forPartialReduction();
}

Expand Down