Skip to content

Commit

Permalink
Do not evaluate shard_size and shard_min_doc_count at segment slice l…
Browse files Browse the repository at this point in the history
…evel (#9085)

* Use BucketCountThresholds in InternalTerms and InternalAggregations and do not apply shard level thresholds at slice level for Concurrent Segment Search

Signed-off-by: Jay Deng <[email protected]>

* Addressing comments

Signed-off-by: Jay Deng <[email protected]>

* Re-introduce shardSize member to InternalMultiTerms and InternalMappedTerms

Signed-off-by: Jay Deng <[email protected]>

* Introduce LocalBucketCountThresholds for local size and min_doc_count values

Signed-off-by: Jay Deng <[email protected]>

---------

Signed-off-by: Jay Deng <[email protected]>
  • Loading branch information
jed326 authored Aug 9, 2023
1 parent ccf0b9a commit 15b7de0
Show file tree
Hide file tree
Showing 43 changed files with 411 additions and 273 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add support for aggregation profiler with concurrent aggregation ([#8801](https://github.com/opensearch-project/OpenSearch/pull/8801))
- [Remove] Deprecated Fractional ByteSizeValue support #9005 ([#9005](https://github.com/opensearch-project/OpenSearch/pull/9005))
- Make MultiBucketConsumerService thread safe to use across slices during search ([#9047](https://github.com/opensearch-project/OpenSearch/pull/9047))
- Change shard_size and shard_min_doc_count evaluation to happen in shard level reduce phase ([#9085](https://github.com/opensearch-project/OpenSearch/pull/9085))

### Deprecated

Expand All @@ -129,4 +130,4 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
### Security

[Unreleased 3.0]: https://github.com/opensearch-project/OpenSearch/compare/2.x...HEAD
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x
[Unreleased 2.x]: https://github.com/opensearch-project/OpenSearch/compare/2.10...2.x
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.aggregations.MultiBucketConsumerService;
import org.opensearch.search.aggregations.bucket.terms.StringTerms;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregator;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.query.QuerySearchResult;
Expand Down Expand Up @@ -170,15 +171,14 @@ private StringTerms newTerms(Random rand, BytesRef[] dict, boolean withNested) {
"terms",
BucketOrder.key(true),
BucketOrder.count(false),
topNSize,
1,
Collections.emptyMap(),
DocValueFormat.RAW,
numShards,
true,
0,
buckets,
0
0,
new TermsAggregator.BucketCountThresholds(1, 0, topNSize, numShards)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregator;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -86,15 +87,14 @@ private StringTerms newTerms(boolean withNested) {
"test",
BucketOrder.key(true),
BucketOrder.key(true),
buckets,
1,
null,
DocValueFormat.RAW,
buckets,
false,
100000,
resultBuckets,
0
0,
new TermsAggregator.BucketCountThresholds(1, 0, buckets, buckets)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* aggregation operators
*/
class AggregationCollectorManager implements CollectorManager<Collector, ReduceableSearchResult> {
private final SearchContext context;
protected final SearchContext context;
private final CheckedFunction<SearchContext, List<Aggregator>, IOException> aggProvider;
private final String collectorReason;

Expand Down Expand Up @@ -63,18 +63,11 @@ public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IO
}

final InternalAggregations internalAggregations = InternalAggregations.from(internals);
// Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce iff multiple slices
// were created to execute this request and it used concurrent segment search path
// TODO: Add the check for flag that the request was executed using concurrent search
if (collectors.size() > 1) {
// using reduce is fine here instead of topLevelReduce as pipeline aggregation is evaluated on the coordinator after all
// documents are collected across shards for an aggregation
return new AggregationReduceableSearchResult(
InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partialOnShard())
);
} else {
return new AggregationReduceableSearchResult(internalAggregations);
}
return buildAggregationResult(internalAggregations);
}

protected AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) {
return new AggregationReduceableSearchResult(internalAggregations);
}

static Collector createCollector(SearchContext context, List<Aggregator> collectors, String reason) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.search.profile.query.CollectorResult;

import java.io.IOException;
import java.util.Collections;
import java.util.Objects;

/**
Expand All @@ -38,4 +39,13 @@ public Collector newCollector() throws IOException {
return super.newCollector();
}
}

@Override
protected AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) {
// Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce as long as any slices
// were created so that we can apply shard level bucket count thresholds in the reduce phase.
return new AggregationReduceableSearchResult(
InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partialOnShard())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.rest.action.search.RestSearchAction;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
import org.opensearch.search.aggregations.bucket.terms.TermsAggregator;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator;
import org.opensearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;
import org.opensearch.search.aggregations.support.AggregationPath;
Expand Down Expand Up @@ -160,6 +162,18 @@ public boolean isSliceLevel() {
return this.isSliceLevel;
}

/**
* For slice level partial reduce we will apply shard level `shard_size` and `shard_min_doc_count` limits
* whereas for coordinator level partial reduce it will use top level `size` and `min_doc_count`
*/
public LocalBucketCountThresholds asLocalBucketCountThresholds(TermsAggregator.BucketCountThresholds bucketCountThresholds) {
if (isSliceLevel()) {
return new LocalBucketCountThresholds(bucketCountThresholds.getShardMinDocCount(), bucketCountThresholds.getShardSize());
} else {
return new LocalBucketCountThresholds(bucketCountThresholds.getMinDocCount(), bucketCountThresholds.getRequiredSize());
}
}

public BigArrays bigArrays() {
return bigArrays;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.search.profile.query.CollectorResult;

import java.io.IOException;
import java.util.Collections;
import java.util.Objects;

/**
Expand All @@ -38,4 +39,13 @@ public Collector newCollector() throws IOException {
return super.newCollector();
}
}

@Override
protected AggregationReduceableSearchResult buildAggregationResult(InternalAggregations internalAggregations) {
// Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce as long as any slices
// were created so that we can apply shard level bucket count thresholds in the reduce phase.
return new AggregationReduceableSearchResult(
InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partialOnShard())
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations.bucket;

import org.opensearch.search.aggregations.bucket.terms.TermsAggregator;

/**
* BucketCountThresholds type that holds the local (either shard level or request level) bucket count thresholds in minDocCount and requireSize fields.
* Similar to {@link TermsAggregator.BucketCountThresholds} however only provides getters for the local members and no setters.
*
* @opensearch.internal
*/
public class LocalBucketCountThresholds {

private final long minDocCount;
private final int requiredSize;

public LocalBucketCountThresholds(long localminDocCount, int localRequiredSize) {
this.minDocCount = localminDocCount;
this.requiredSize = localRequiredSize;
}

public int getRequiredSize() {
return requiredSize;
}

public long getMinDocCount() {
return minDocCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,14 @@ protected StringTerms buildEmptyTermsAggregation() {
name,
order,
order,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
format,
bucketCountThresholds.getShardSize(),
showTermDocCountError,
0,
emptyList(),
0
0,
bucketCountThresholds
);
}

Expand All @@ -95,14 +94,13 @@ protected SignificantStringTerms buildEmptySignificantTermsAggregation(long subs
int supersetSize = topReader.numDocs();
return new SignificantStringTerms(
name,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
format,
subsetSize,
supersetSize,
significanceHeuristic,
emptyList()
emptyList(),
bucketCountThresholds
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -130,29 +130,27 @@ public DoubleTerms(
String name,
BucketOrder reduceOrder,
BucketOrder order,
int requiredSize,
long minDocCount,
Map<String, Object> metadata,
DocValueFormat format,
int shardSize,
boolean showTermDocCountError,
long otherDocCount,
List<Bucket> buckets,
long docCountError
long docCountError,
TermsAggregator.BucketCountThresholds bucketCountThresholds
) {
super(
name,
reduceOrder,
order,
requiredSize,
minDocCount,
metadata,
format,
shardSize,
showTermDocCountError,
otherDocCount,
buckets,
docCountError
docCountError,
bucketCountThresholds
);
}

Expand All @@ -174,15 +172,14 @@ public DoubleTerms create(List<Bucket> buckets) {
name,
reduceOrder,
order,
requiredSize,
minDocCount,
metadata,
format,
shardSize,
showTermDocCountError,
otherDocCount,
buckets,
docCountError
docCountError,
bucketCountThresholds
);
}

Expand All @@ -204,15 +201,14 @@ protected DoubleTerms create(String name, List<Bucket> buckets, BucketOrder redu
name,
reduceOrder,
order,
requiredSize,
minDocCount,
getMetadata(),
format,
shardSize,
showTermDocCountError,
otherDocCount,
buckets,
docCountError
docCountError,
bucketCountThresholds
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.search.aggregations.InternalOrder;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.search.aggregations.LeafBucketCollectorBase;
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
import org.opensearch.search.aggregations.bucket.terms.SignificanceLookup.BackgroundFrequencyForBytes;
import org.opensearch.search.aggregations.bucket.terms.heuristic.SignificanceHeuristic;
import org.opensearch.search.aggregations.support.ValuesSource;
Expand Down Expand Up @@ -603,6 +604,7 @@ abstract class ResultStrategy<
TB extends InternalMultiBucketAggregation.InternalBucket> implements Releasable {

private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws IOException {
LocalBucketCountThresholds localBucketCountThresholds = context.asLocalBucketCountThresholds(bucketCountThresholds);
if (valueCount == 0) { // no context in this reader
InternalAggregation[] results = new InternalAggregation[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
Expand All @@ -615,11 +617,11 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws
long[] otherDocCount = new long[owningBucketOrds.length];
for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) {
final int size;
if (bucketCountThresholds.getMinDocCount() == 0) {
if (localBucketCountThresholds.getMinDocCount() == 0) {
// if minDocCount == 0 then we can end up with more buckets then maxBucketOrd() returns
size = (int) Math.min(valueCount, bucketCountThresholds.getShardSize());
size = (int) Math.min(valueCount, localBucketCountThresholds.getRequiredSize());
} else {
size = (int) Math.min(maxBucketOrd(), bucketCountThresholds.getShardSize());
size = (int) Math.min(maxBucketOrd(), localBucketCountThresholds.getRequiredSize());
}
PriorityQueue<TB> ordered = buildPriorityQueue(size);
final int finalOrdIdx = ordIdx;
Expand All @@ -630,7 +632,7 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws
@Override
public void accept(long globalOrd, long bucketOrd, long docCount) throws IOException {
otherDocCount[finalOrdIdx] += docCount;
if (docCount >= bucketCountThresholds.getShardMinDocCount()) {
if (docCount >= localBucketCountThresholds.getMinDocCount()) {
if (spare == null) {
spare = buildEmptyTemporaryBucket();
}
Expand Down Expand Up @@ -799,15 +801,14 @@ StringTerms buildResult(long owningBucketOrd, long otherDocCount, StringTerms.Bu
name,
reduceOrder,
order,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
format,
bucketCountThresholds.getShardSize(),
showTermDocCountError,
otherDocCount,
Arrays.asList(topBuckets),
0
0,
bucketCountThresholds
);
}

Expand Down Expand Up @@ -924,14 +925,13 @@ void buildSubAggs(SignificantStringTerms.Bucket[][] topBucketsPreOrd) throws IOE
SignificantStringTerms buildResult(long owningBucketOrd, long otherDocCount, SignificantStringTerms.Bucket[] topBuckets) {
return new SignificantStringTerms(
name,
bucketCountThresholds.getRequiredSize(),
bucketCountThresholds.getMinDocCount(),
metadata(),
format,
subsetSize(owningBucketOrd),
supersetSize,
significanceHeuristic,
Arrays.asList(topBuckets)
Arrays.asList(topBuckets),
bucketCountThresholds
);
}

Expand Down
Loading

0 comments on commit 15b7de0

Please sign in to comment.