From 03318f9eb0dd2aa7e8f7eab480789ab3838204ad Mon Sep 17 00:00:00 2001 From: iverase Date: Tue, 22 Aug 2023 17:10:17 +0800 Subject: [PATCH 01/12] Move Aggregator#buildTopLevel() to search worker thread. --- .../search/aggregations/AggregationPhase.java | 87 +++++++------------ .../aggregations/AggregatorCollector.java | 52 +++++++++++ .../SearchContextAggregations.java | 24 +---- .../query/InternalProfileCollector.java | 6 +- .../search/query/QueryPhase.java | 1 - .../search/query/QueryPhaseCollector.java | 9 +- .../query/QueryPhaseCollectorManager.java | 25 +++--- .../elasticsearch/test/ESIntegTestCase.java | 1 + 8 files changed, 107 insertions(+), 98 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/search/aggregations/AggregatorCollector.java diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java index 6dc05fa8fe843..7da7c10b96257 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java @@ -7,7 +7,6 @@ */ package org.elasticsearch.search.aggregations; -import org.apache.lucene.search.Collector; import org.apache.lucene.search.CollectorManager; import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.search.aggregations.support.TimeSeriesIndexSearcher; @@ -31,36 +30,54 @@ public static void preProcess(SearchContext context) { if (context.aggregations() == null) { return; } - final Supplier collectorSupplier; + final Supplier collectorSupplier; if (context.aggregations().isInSortOrderExecutionRequired()) { - executeInSortOrder(context, newBucketCollector(context)); - collectorSupplier = () -> BucketCollector.NO_OP_COLLECTOR; + AggregatorCollector collector = newAggregatorCollector(context); + executeInSortOrder(context, collector.bucketCollector); + collectorSupplier = () -> new AggregatorCollector(collector.aggregators, BucketCollector.NO_OP_BUCKET_COLLECTOR); } else { - collectorSupplier = () -> newBucketCollector(context).asCollector(); + collectorSupplier = () -> newAggregatorCollector(context); } context.aggregations().registerAggsCollectorManager(new CollectorManager<>() { @Override - public Collector newCollector() { + public AggregatorCollector newCollector() { return collectorSupplier.get(); } @Override - public Void reduce(Collection collectors) { - // we cannot run post-collection method here because we need to do it after the optional timeout - // has been removed from the index searcher. Therefore, we delay this processing to the - // AggregationPhase#execute method. + public Void reduce(Collection collectors) { + if (context.queryResult().hasAggs()) { + // no need to compute the aggs twice, they should be computed on a per context basis + return null; + } + if (collectors.size() > 1) { + // we execute this search using more than one slice. In order to keep memory requirements + // low, we do a partial reduction here. + final List internalAggregations = new ArrayList<>(collectors.size()); + collectors.forEach(c -> internalAggregations.add(InternalAggregations.from(c.internalAggregations))); + context.queryResult() + .aggregations( + InternalAggregations.topLevelReduce( + internalAggregations, + context.aggregations().getAggregationReduceContextBuilder().forPartialReduction() + ) + ); + } else if (collectors.size() == 1) { + context.queryResult().aggregations(InternalAggregations.from(collectors.iterator().next().internalAggregations)); + } + // disable aggregations so that they don't run on next pages in case of scrolling + context.aggregations(null); return null; } }); } - private static BucketCollector newBucketCollector(SearchContext context) { + private static AggregatorCollector newAggregatorCollector(SearchContext context) { try { Aggregator[] aggregators = context.aggregations().factories().createTopLevelAggregators(); - context.aggregations().aggregators(aggregators); BucketCollector bucketCollector = MultiBucketCollector.wrap(true, List.of(aggregators)); bucketCollector.preCollection(); - return bucketCollector; + return new AggregatorCollector(aggregators, bucketCollector); } catch (IOException e) { throw new AggregationInitializationException("Could not initialize aggregators", e); } @@ -97,48 +114,4 @@ private static List getCancellationChecks(SearchContext context) { return cancellationChecks; } - - public static void execute(SearchContext context) { - if (context.aggregations() == null) { - context.queryResult().aggregations(null); - return; - } - - if (context.queryResult().hasAggs()) { - // no need to compute the aggs twice, they should be computed on a per context basis - return; - } - - final List internalAggregations = new ArrayList<>(context.aggregations().aggregators().size()); - for (Aggregator[] aggregators : context.aggregations().aggregators()) { - final List aggregations = new ArrayList<>(aggregators.length); - for (Aggregator aggregator : aggregators) { - try { - aggregations.add(aggregator.buildTopLevel()); - } catch (IOException e) { - throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e); - } - // release the aggregator to claim the used bytes as we don't need it anymore - aggregator.releaseAggregations(); - } - internalAggregations.add(InternalAggregations.from(aggregations)); - } - - if (internalAggregations.size() > 1) { - // we execute this search using more than one slice. In order to keep memory requirements - // low, we do a partial reduction here. - context.queryResult() - .aggregations( - InternalAggregations.topLevelReduce( - internalAggregations, - context.aggregations().getAggregationReduceContextBuilder().forPartialReduction() - ) - ); - } else { - context.queryResult().aggregations(internalAggregations.get(0)); - } - - // disable aggregations so that they don't run on next pages in case of scrolling - context.aggregations(null); - } } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorCollector.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorCollector.java new file mode 100644 index 0000000000000..fef12a19872ed --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorCollector.java @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.search.aggregations; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.Collector; +import org.apache.lucene.search.LeafCollector; +import org.apache.lucene.search.ScoreMode; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** Collector that controls the life cycle of an aggregation document collection. */ +public class AggregatorCollector implements Collector { + final Aggregator[] aggregators; + final BucketCollector bucketCollector; + final List internalAggregations; + + public AggregatorCollector(Aggregator[] aggregators, BucketCollector bucketCollector) { + this.aggregators = aggregators; + this.bucketCollector = bucketCollector; + this.internalAggregations = new ArrayList<>(aggregators.length); + } + + @Override + public LeafCollector getLeafCollector(LeafReaderContext context) throws IOException { + return bucketCollector.getLeafCollector(new AggregationExecutionContext(context, null, null, null)); + } + + @Override + public ScoreMode scoreMode() { + return bucketCollector.scoreMode(); + } + + /** Should be call after collecting the documents. It generates the internal aggregations which are + * stored on {@code internalAggregations} */ + public void finish() throws IOException { + bucketCollector.postCollection(); + for (Aggregator aggregator : aggregators) { + internalAggregations.add(aggregator.buildTopLevel()); + // release the aggregator to claim the used bytes as we don't need it anymore + aggregator.releaseAggregations(); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/SearchContextAggregations.java b/server/src/main/java/org/elasticsearch/search/aggregations/SearchContextAggregations.java index 2b7e2f49d6eff..361b9e8da557f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/SearchContextAggregations.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/SearchContextAggregations.java @@ -7,11 +7,8 @@ */ package org.elasticsearch.search.aggregations; -import org.apache.lucene.search.Collector; import org.apache.lucene.search.CollectorManager; -import java.util.ArrayList; -import java.util.List; import java.util.function.Supplier; /** @@ -21,8 +18,7 @@ public class SearchContextAggregations { private final AggregatorFactories factories; private final Supplier toAggregationReduceContextBuilder; - private final List aggregators; - private CollectorManager aggCollectorManager; + private CollectorManager aggCollectorManager; /** * Creates a new aggregation context with the parsed aggregator factories @@ -33,37 +29,23 @@ public SearchContextAggregations( ) { this.factories = factories; this.toAggregationReduceContextBuilder = toAggregationReduceContextBuilder; - this.aggregators = new ArrayList<>(); } public AggregatorFactories factories() { return factories; } - public List aggregators() { - return aggregators; - } - - /** - * Registers all the created aggregators (top level aggregators) for the search execution context. - * - * @param aggregators The top level aggregators of the search execution. - */ - public void aggregators(Aggregator[] aggregators) { - this.aggregators.add(aggregators); - } - /** * Registers the collector to be run for the aggregations phase */ - public void registerAggsCollectorManager(CollectorManager aggCollectorManager) { + public void registerAggsCollectorManager(CollectorManager aggCollectorManager) { this.aggCollectorManager = aggCollectorManager; } /** * Returns the collector to be run for the aggregations phase */ - public CollectorManager getAggsCollectorManager() { + public CollectorManager getAggsCollectorManager() { return aggCollectorManager; } diff --git a/server/src/main/java/org/elasticsearch/search/profile/query/InternalProfileCollector.java b/server/src/main/java/org/elasticsearch/search/profile/query/InternalProfileCollector.java index 15003cb075fb3..cc6f8b591e0c6 100644 --- a/server/src/main/java/org/elasticsearch/search/profile/query/InternalProfileCollector.java +++ b/server/src/main/java/org/elasticsearch/search/profile/query/InternalProfileCollector.java @@ -10,7 +10,7 @@ import org.apache.lucene.sandbox.search.ProfilerCollector; import org.apache.lucene.search.Collector; -import org.elasticsearch.search.aggregations.BucketCollector; +import org.elasticsearch.search.aggregations.AggregatorCollector; import org.elasticsearch.search.query.QueryPhaseCollector; import java.io.IOException; @@ -83,8 +83,8 @@ public void doPostCollection() throws IOException { profileCollector.doPostCollection(); } else if (wrappedCollector instanceof QueryPhaseCollector queryPhaseCollector) { queryPhaseCollector.doPostCollection(); - } else if (wrappedCollector instanceof BucketCollector.BucketCollectorWrapper aggsCollector) { - aggsCollector.bucketCollector().postCollection(); + } else if (wrappedCollector instanceof AggregatorCollector aggsCollector) { + aggsCollector.finish(); } } } diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java index d7d19059cd56d..1761855fb5a2f 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -135,7 +135,6 @@ static void executeQuery(SearchContext searchContext) throws QueryPhaseExecution RescorePhase.execute(searchContext); SuggestPhase.execute(searchContext); - AggregationPhase.execute(searchContext); if (searchContext.getProfilers() != null) { searchContext.queryResult().profileResults(searchContext.getProfilers().buildQueryPhaseResults()); diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhaseCollector.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhaseCollector.java index ffdae41eb331f..6de020e3c5c75 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhaseCollector.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhaseCollector.java @@ -22,7 +22,8 @@ import org.apache.lucene.search.Weight; import org.apache.lucene.util.Bits; import org.elasticsearch.common.lucene.Lucene; -import org.elasticsearch.search.aggregations.BucketCollector; +import org.elasticsearch.search.aggregations.AggregatorCollector; +import org.elasticsearch.search.aggregations.InternalAggregations; import org.elasticsearch.search.profile.query.InternalProfileCollector; import java.io.IOException; @@ -42,6 +43,8 @@ */ public final class QueryPhaseCollector implements Collector { private final Collector aggsCollector; + // populated during post collection phase + private InternalAggregations internalAggregations; private final Collector topDocsCollector; private final TerminateAfterChecker terminateAfterChecker; private final Weight postFilterWeight; @@ -375,8 +378,8 @@ boolean incrementHitCountAndCheckThreshold() { }; public void doPostCollection() throws IOException { - if (aggsCollector instanceof BucketCollector.BucketCollectorWrapper bucketCollectorWrapper) { - bucketCollectorWrapper.bucketCollector().postCollection(); + if (aggsCollector instanceof AggregatorCollector aggregatorCollector) { + aggregatorCollector.finish(); } else if (aggsCollector instanceof InternalProfileCollector profileCollector) { profileCollector.doPostCollection(); } diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhaseCollectorManager.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhaseCollectorManager.java index afdfbd785908c..86a01756d247e 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhaseCollectorManager.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhaseCollectorManager.java @@ -50,6 +50,7 @@ import org.elasticsearch.lucene.grouping.SinglePassGroupingCollector; import org.elasticsearch.lucene.grouping.TopFieldGroups; import org.elasticsearch.search.DocValueFormat; +import org.elasticsearch.search.aggregations.AggregatorCollector; import org.elasticsearch.search.collapse.CollapseContext; import org.elasticsearch.search.internal.ScrollContext; import org.elasticsearch.search.internal.SearchContext; @@ -76,14 +77,14 @@ abstract class QueryPhaseCollectorManager implements CollectorManager { private final Weight postFilterWeight; private final QueryPhaseCollector.TerminateAfterChecker terminateAfterChecker; - private final CollectorManager aggsCollectorManager; + private final CollectorManager aggsCollectorManager; private final Float minScore; private final boolean profile; QueryPhaseCollectorManager( Weight postFilterWeight, QueryPhaseCollector.TerminateAfterChecker terminateAfterChecker, - CollectorManager aggsCollectorManager, + CollectorManager aggsCollectorManager, Float minScore, boolean profile ) { @@ -147,7 +148,7 @@ public final QueryPhaseResult reduce(Collection collectors) throws IO boolean terminatedAfter = false; CollectorResult collectorResult = null; List topDocsCollectors = new ArrayList<>(); - List aggsCollectors = new ArrayList<>(); + List aggsCollectors = new ArrayList<>(); if (profile) { List resultsPerProfiler = new ArrayList<>(); List topDocsCollectorResults = new ArrayList<>(); @@ -165,7 +166,7 @@ public final QueryPhaseResult reduce(Collection collectors) throws IO if (aggsCollectorManager != null) { InternalProfileCollector profileAggsCollector = (InternalProfileCollector) queryPhaseCollector.getAggsCollector(); aggsCollectorResults.add(profileAggsCollector.getCollectorTree()); - aggsCollectors.add(profileAggsCollector.getWrappedCollector()); + aggsCollectors.add((AggregatorCollector) profileAggsCollector.getWrappedCollector()); } } List childrenResults = new ArrayList<>(); @@ -178,16 +179,14 @@ public final QueryPhaseResult reduce(Collection collectors) throws IO for (Collector collector : collectors) { QueryPhaseCollector queryPhaseCollector = (QueryPhaseCollector) collector; topDocsCollectors.add(queryPhaseCollector.getTopDocsCollector()); - aggsCollectors.add(queryPhaseCollector.getAggsCollector()); + aggsCollectors.add((AggregatorCollector) queryPhaseCollector.getAggsCollector()); if (queryPhaseCollector.isTerminatedAfter()) { terminatedAfter = true; } } } if (aggsCollectorManager != null) { - @SuppressWarnings("unchecked") - CollectorManager aggsManager = (CollectorManager) aggsCollectorManager; - aggsManager.reduce(aggsCollectors); + aggsCollectorManager.reduce(aggsCollectors); } TopDocsAndMaxScore topDocsAndMaxScore = reduceTopDocsCollectors(topDocsCollectors); return new QueryPhaseResult(topDocsAndMaxScore, getSortValueFormats(), terminatedAfter, collectorResult); @@ -212,7 +211,7 @@ private static CollectorResult reduceCollectorResults(Collection createQueryPhaseCollectorManager( Weight postFilterWeight, - CollectorManager aggsCollectorManager, + CollectorManager aggsCollectorManager, SearchContext searchContext, boolean hasFilterCollector ) throws IOException { @@ -310,7 +309,7 @@ private static final class EmptyHits extends QueryPhaseCollectorManager { EmptyHits( Weight postFilterWeight, QueryPhaseCollector.TerminateAfterChecker terminateAfterChecker, - CollectorManager aggsCollectorManager, + CollectorManager aggsCollectorManager, Float minScore, boolean profile, @Nullable SortAndFormats sortAndFormats, @@ -375,7 +374,7 @@ private static class WithHits extends QueryPhaseCollectorManager { WithHits( Weight postFilterWeight, QueryPhaseCollector.TerminateAfterChecker terminateAfterChecker, - CollectorManager aggsCollectorManager, + CollectorManager aggsCollectorManager, Float minScore, boolean profile, IndexReader reader, @@ -480,7 +479,7 @@ protected final DocValueFormat[] getSortValueFormats() { private static WithHits forScroll( Weight postFilterWeight, QueryPhaseCollector.TerminateAfterChecker terminateAfterChecker, - CollectorManager aggsCollectorManager, + CollectorManager aggsCollectorManager, Float minScore, boolean profile, IndexReader reader, @@ -540,7 +539,7 @@ public TopDocsAndMaxScore reduceTopDocsCollectors(Collection collecto private static QueryPhaseCollectorManager forCollapsing( Weight postFilterWeight, QueryPhaseCollector.TerminateAfterChecker terminateAfterChecker, - CollectorManager aggsCollectorManager, + CollectorManager aggsCollectorManager, Float minScore, boolean profile, CollapseContext collapseContext, diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 0f3588eee4de6..df14bce8a1dca 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -246,6 +246,7 @@ *
  • - a random seed used to initialize the index random context. * */ +@LuceneTestCase.SuppressCodecs("*") // remove this once we have asserting codec that works with ES @LuceneTestCase.SuppressFileSystems("ExtrasFS") // doesn't work with potential multi data path from test cluster yet public abstract class ESIntegTestCase extends ESTestCase { From 98572e4642775bb42742cfa38a2f2034c20868c4 Mon Sep 17 00:00:00 2001 From: iverase Date: Wed, 23 Aug 2023 17:02:07 +0800 Subject: [PATCH 02/12] fix profiler test --- docs/reference/search/profile.asciidoc | 2 +- .../search/aggregations/AggregatorCollector.java | 10 ++++++++++ .../search/profile/query/InternalProfileCollector.java | 9 +-------- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/docs/reference/search/profile.asciidoc b/docs/reference/search/profile.asciidoc index 7f9cf34ba7dad..52dfb91475c53 100644 --- a/docs/reference/search/profile.asciidoc +++ b/docs/reference/search/profile.asciidoc @@ -740,7 +740,7 @@ The API returns the following result: "time_in_nanos": 22577 }, { - "name": "BucketCollectorWrapper: [BucketCollectorWrapper[bucketCollector=[my_scoped_agg, my_global_agg]]]", + "name": "AggregatorCollector: [my_scoped_agg, my_global_agg]", "reason": "aggregation", "time_in_nanos": 867617 } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorCollector.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorCollector.java index fef12a19872ed..eb77f7b935ba7 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorCollector.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorCollector.java @@ -15,6 +15,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; /** Collector that controls the life cycle of an aggregation document collection. */ @@ -49,4 +50,13 @@ public void finish() throws IOException { aggregator.releaseAggregations(); } } + + @Override + public String toString() { + String[] aggNames = new String[aggregators.length]; + for (int i = 0; i < aggregators.length; i++) { + aggNames[i] = aggregators[i].name(); + } + return Arrays.toString(aggNames); + } } diff --git a/server/src/main/java/org/elasticsearch/search/profile/query/InternalProfileCollector.java b/server/src/main/java/org/elasticsearch/search/profile/query/InternalProfileCollector.java index cc6f8b591e0c6..1886513c58742 100644 --- a/server/src/main/java/org/elasticsearch/search/profile/query/InternalProfileCollector.java +++ b/server/src/main/java/org/elasticsearch/search/profile/query/InternalProfileCollector.java @@ -55,16 +55,9 @@ public Collector getWrappedCollector() { protected String deriveCollectorName(Collector c) { String s = c.getClass().getSimpleName(); - // MultiCollector which wraps multiple BucketCollectors is generated - // via an anonymous class, so this corrects the lack of a name by - // asking the enclosingClass - if (s.equals("")) { - s = c.getClass().getEnclosingClass().getSimpleName(); - } - // Aggregation collector toString()'s include the user-defined agg name if (getReason().equals(CollectorResult.REASON_AGGREGATION) || getReason().equals(CollectorResult.REASON_AGGREGATION_GLOBAL)) { - s += ": [" + c + "]"; + s += ": " + c; } return s; } From a64270177288a529b5f14204428be9c1e47081bc Mon Sep 17 00:00:00 2001 From: iverase Date: Tue, 29 Aug 2023 09:29:28 +0800 Subject: [PATCH 03/12] Update AggregatorTestCase --- .../search/aggregations/AggregationPhase.java | 42 ++-------- .../AggregatorCollectorManager.java | 55 +++++++++++++ .../SignificantTermsAggregationBuilder.java | 5 ++ .../support/AggregationContext.java | 2 +- .../search/internal/ContextIndexSearcher.java | 10 ++- .../aggregations/AggregatorTestCase.java | 77 +++++++++++-------- 6 files changed, 122 insertions(+), 69 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/search/aggregations/AggregatorCollectorManager.java diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java index 7da7c10b96257..a4d3c3c0178ab 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java @@ -7,7 +7,6 @@ */ package org.elasticsearch.search.aggregations; -import org.apache.lucene.search.CollectorManager; import org.elasticsearch.action.search.SearchShardTask; import org.elasticsearch.search.aggregations.support.TimeSeriesIndexSearcher; import org.elasticsearch.search.internal.SearchContext; @@ -15,7 +14,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import java.util.function.Supplier; @@ -38,38 +36,14 @@ public static void preProcess(SearchContext context) { } else { collectorSupplier = () -> newAggregatorCollector(context); } - context.aggregations().registerAggsCollectorManager(new CollectorManager<>() { - @Override - public AggregatorCollector newCollector() { - return collectorSupplier.get(); - } - - @Override - public Void reduce(Collection collectors) { - if (context.queryResult().hasAggs()) { - // no need to compute the aggs twice, they should be computed on a per context basis - return null; - } - if (collectors.size() > 1) { - // we execute this search using more than one slice. In order to keep memory requirements - // low, we do a partial reduction here. - final List internalAggregations = new ArrayList<>(collectors.size()); - collectors.forEach(c -> internalAggregations.add(InternalAggregations.from(c.internalAggregations))); - context.queryResult() - .aggregations( - InternalAggregations.topLevelReduce( - internalAggregations, - context.aggregations().getAggregationReduceContextBuilder().forPartialReduction() - ) - ); - } else if (collectors.size() == 1) { - context.queryResult().aggregations(InternalAggregations.from(collectors.iterator().next().internalAggregations)); - } - // disable aggregations so that they don't run on next pages in case of scrolling - context.aggregations(null); - return null; - } - }); + context.aggregations() + .registerAggsCollectorManager( + new AggregatorCollectorManager( + collectorSupplier, + internalAggregations -> context.queryResult().aggregations(internalAggregations), + () -> context.aggregations().getAggregationReduceContextBuilder().forFinalReduction() + ) + ); } private static AggregatorCollector newAggregatorCollector(SearchContext context) { diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorCollectorManager.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorCollectorManager.java new file mode 100644 index 0000000000000..51e302ea9fdd0 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregatorCollectorManager.java @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.search.aggregations; + +import org.apache.lucene.search.CollectorManager; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** Collector manager that produces {@link AggregatorCollector} and merges them during the reduce phase. */ +public class AggregatorCollectorManager implements CollectorManager { + + private final Supplier collectorSupplier; + private final Consumer internalAggregationsConsumer; + private final Supplier reduceContextSupplier; + + public AggregatorCollectorManager( + Supplier collectorSupplier, + Consumer internalAggregationsConsumer, + Supplier reduceContextSupplier + ) { + this.collectorSupplier = collectorSupplier; + this.internalAggregationsConsumer = internalAggregationsConsumer; + this.reduceContextSupplier = reduceContextSupplier; + } + + @Override + public AggregatorCollector newCollector() throws IOException { + return collectorSupplier.get(); + } + + @Override + public Void reduce(Collection collectors) throws IOException { + if (collectors.size() > 1) { + // we execute this search using more than one slice. In order to keep memory requirements + // low, we do a partial reduction here. + final List internalAggregations = new ArrayList<>(collectors.size()); + collectors.forEach(c -> internalAggregations.add(InternalAggregations.from(c.internalAggregations))); + internalAggregationsConsumer.accept(InternalAggregations.topLevelReduce(internalAggregations, reduceContextSupplier.get())); + } else if (collectors.size() == 1) { + internalAggregationsConsumer.accept(InternalAggregations.from(collectors.iterator().next().internalAggregations)); + } + return null; + } +} diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregationBuilder.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregationBuilder.java index d4589a533c67f..f2f3cb2bcfffc 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregationBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregationBuilder.java @@ -135,6 +135,11 @@ public boolean supportsSampling() { return true; } + @Override + public boolean supportsParallelCollection() { + return false; + } + @Override protected ValuesSourceType defaultValueSourceType() { return CoreValuesSourceType.KEYWORD; diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java index ec147507f8970..b69b0cd70f118 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java @@ -552,7 +552,7 @@ public void addReleasable(Aggregator aggregator) { } @Override - public void removeReleasable(Aggregator aggregator) { + public synchronized void removeReleasable(Aggregator aggregator) { assert releaseMe.contains(aggregator) : "removing non-existing aggregator [" + aggregator.name() + "] from the the aggregation context"; releaseMe.remove(aggregator); diff --git a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java index 6bf4267c3a42f..d5af8e994dfb3 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java @@ -345,6 +345,7 @@ public T search(Query query, CollectorManager col weight = createWeight(query, firstCollector.scoreMode(), 1); } catch (@SuppressWarnings("unused") TimeExceededException e) { timeExceeded = true; + doAggregationPostCollection(firstCollector); return collectorManager.reduce(Collections.singletonList(firstCollector)); } return search(weight, collectorManager, firstCollector); @@ -475,14 +476,21 @@ public boolean isCancelled() { @Override public void search(List leaves, Weight weight, Collector collector) throws IOException { collector.setWeight(weight); + boolean success = false; try { for (LeafReaderContext ctx : leaves) { // search each subreader searchLeaf(ctx, weight, collector); } + success = true; } catch (@SuppressWarnings("unused") TimeExceededException e) { timeExceeded = true; } finally { - doAggregationPostCollection(collector); + // Only run post collection if we have timeout or if the search was successful + // otherwise the state of the aggregation might be undefined and running post collection + // might result in an exception + if (success || timeExceeded) { + doAggregationPostCollection(collector); + } } } diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index eee0c1b05cdc8..f2b4f4928bd7a 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -24,7 +24,6 @@ import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.sandbox.document.HalfFloatPoint; import org.apache.lucene.search.Collector; -import org.apache.lucene.search.CollectorManager; import org.apache.lucene.search.IndexSearcher; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; @@ -121,7 +120,6 @@ import org.elasticsearch.search.aggregations.metrics.MetricsAggregator; import org.elasticsearch.search.aggregations.metrics.MultiValueAggregation; import org.elasticsearch.search.aggregations.metrics.NumericMetricsAggregation; -import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; import org.elasticsearch.search.aggregations.support.AggregationContext; import org.elasticsearch.search.aggregations.support.AggregationContext.ProductionAggregationContext; import org.elasticsearch.search.aggregations.support.CoreValuesSourceType; @@ -149,7 +147,6 @@ import java.net.InetAddress; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -160,6 +157,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.function.BiFunction; import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.stream.Stream; import static java.util.Collections.emptyMap; @@ -552,9 +550,9 @@ private A searchAndReduce( MappedFieldType[] fieldTypes = aggTestConfig.fieldTypes(); final IndexReaderContext ctx = searcher.getTopReaderContext(); - final PipelineTree pipelines = builder.buildPipelineTree(); - List internalAggs = new ArrayList<>(); + List internalAggs = new ArrayList<>(); Query rewritten = searcher.rewrite(query); + BigArrays bigArraysForReduction = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), breakerService); if (splitLeavesIntoSeparateAggregators && searcher.getIndexReader().leaves().size() > 0 @@ -589,7 +587,9 @@ private A searchAndReduce( } a.postCollection(); assertEquals(shouldBeCached, context.isCacheable()); - internalAggs.add(a.buildTopLevel()); + List internalAggregations = List.of(a.buildTopLevel()); + assertRoundTrip(internalAggregations); + internalAggs.add(InternalAggregations.from(internalAggregations)); } finally { Releasables.close(context); } @@ -613,54 +613,61 @@ private A searchAndReduce( aggregators.add(root); new TimeSeriesIndexSearcher(searcher, List.of()).search(rewritten, MultiBucketCollector.wrap(true, List.of(root))); root.postCollection(); + List internalAggregations = List.of(root.buildTopLevel()); + assertRoundTrip(internalAggregations); + internalAggs.add(InternalAggregations.from(internalAggregations)); } else { - CollectorManager collectorManager = new CollectorManager<>() { - @Override - public Collector newCollector() throws IOException { - C collector = createAggregator(builder, context); - collector.preCollection(); - aggregators.add(collector); - return MultiBucketCollector.wrap(true, List.of(collector)).asCollector(); - } - - @Override - public Void reduce(Collection collectors) { - return null; + Supplier aggregatorSupplier = () -> { + try { + Aggregator aggregator = createAggregator(builder, context); + aggregator.preCollection(); + BucketCollector bucketCollector = MultiBucketCollector.wrap(true, List.of(aggregator)); + return new AggregatorCollector(new Aggregator[] { aggregator }, bucketCollector); + } catch (IOException e) { + throw new AggregationInitializationException("Could not initialize aggregators", e); } }; + Supplier reduceContextSupplier = () -> new AggregationReduceContext.ForPartial( + bigArraysForReduction, + getMockScriptService(), + () -> false, + builder + ); + AggregatorCollectorManager aggregatorCollectorManager = new AggregatorCollectorManager( + aggregatorSupplier, + internalAggs::add, + reduceContextSupplier + ); + if (aggTestConfig.builder().supportsParallelCollection()) { - searcher.search(rewritten, collectorManager); + searcher.search(rewritten, aggregatorCollectorManager); } else { - searcher.search(rewritten, collectorManager.newCollector()); + AggregatorCollector collector = aggregatorCollectorManager.newCollector(); + searcher.search(rewritten, collector); + aggregatorCollectorManager.reduce(List.of(collector)); } } - for (C agg : aggregators) { - internalAggs.add(agg.buildTopLevel()); - } } finally { Releasables.close(context); } } - assertRoundTrip(internalAggs); - BigArrays bigArraysForReduction = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), breakerService); try { if (aggTestConfig.incrementalReduce() && internalAggs.size() > 1) { // sometimes do an incremental reduce int toReduceSize = internalAggs.size(); Collections.shuffle(internalAggs, random()); int r = randomIntBetween(1, toReduceSize); - List toReduce = internalAggs.subList(0, r); + List toReduce = internalAggs.subList(0, r); AggregationReduceContext reduceContext = new AggregationReduceContext.ForPartial( bigArraysForReduction, getMockScriptService(), () -> false, builder ); - A reduced = (A) doReduce(toReduce, reduceContext); internalAggs = new ArrayList<>(internalAggs.subList(r, toReduceSize)); - internalAggs.add(reduced); - assertRoundTrip(internalAggs); + internalAggs.add(InternalAggregations.topLevelReduce(toReduce, reduceContext)); + // assertRoundTrip(internalAggs); } // now do the final reduce @@ -677,12 +684,9 @@ public Void reduce(Collection collectors) { ); @SuppressWarnings("unchecked") - A internalAgg = (A) doReduce(internalAggs, reduceContext); + A internalAgg = (A) doInternalAggregationsReduce(internalAggs, reduceContext); assertRoundTrip(internalAgg); - // materialize any parent pipelines - internalAgg = (A) internalAgg.reducePipelines(internalAgg, reduceContext, pipelines); - doAssertReducedMultiBucketConsumer(internalAgg, reduceBucketConsumer); assertRoundTrip(internalAgg); if (aggTestConfig.builder instanceof ValuesSourceAggregationBuilder.MetricsAggregationBuilder) { @@ -699,6 +703,13 @@ private InternalAggregation doReduce(List aggregators, Aggr for (InternalAggregation aggregator : aggregators) { internalAggregations.add(InternalAggregations.from(List.of(aggregator))); } + return doInternalAggregationsReduce(internalAggregations, reduceContext); + } + + private InternalAggregation doInternalAggregationsReduce( + List internalAggregations, + AggregationReduceContext reduceContext + ) { InternalAggregations aggregations = InternalAggregations.topLevelReduce(internalAggregations, reduceContext); List reduced = aggregations.copyResults(); assertThat(reduced.size(), equalTo(1)); From 48cbb414fbabbca552c8f87710c2f54bcf270bcf Mon Sep 17 00:00:00 2001 From: iverase Date: Fri, 1 Sep 2023 12:54:20 +0800 Subject: [PATCH 04/12] run always in worker thread in aggregatorTestCase --- .../aggregations/AggregatorTestCase.java | 33 ++++--------------- .../topmetrics/TopMetricsAggregatorTests.java | 1 + 2 files changed, 8 insertions(+), 26 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 6c65ca4af408a..637d06af8c16c 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -291,19 +291,6 @@ protected AggregationContext createAggregationContext(IndexReader indexReader, Q AggregationBuilder.DEFAULT_PREALLOCATION * 5, // We don't know how many bytes to preallocate so we grab a hand full DEFAULT_MAX_BUCKETS, false, - fieldTypes - ); - } - - private AggregationContext createAggregationContext(IndexSearcher indexSearcher, Query query, MappedFieldType... fieldTypes) - throws IOException { - return createAggregationContext( - indexSearcher, - createIndexSettings(), - query, - new NoneCircuitBreakerService(), - AggregationBuilder.DEFAULT_PREALLOCATION * 5, // We don't know how many bytes to preallocate so we grab a hand full - DEFAULT_MAX_BUCKETS, false, fieldTypes ); @@ -326,10 +313,11 @@ protected AggregationContext createAggregationContext( long bytesToPreallocate, int maxBucket, boolean isInSortOrderExecutionRequired, + boolean supportsParallelCollection, MappedFieldType... fieldTypes ) throws IOException { return createAggregationContext( - newIndexSearcher(indexReader), + newIndexSearcher(indexReader, supportsParallelCollection), indexSettings, query, breakerService, @@ -507,7 +495,7 @@ protected ScriptService getMockScriptService() { */ protected A searchAndReduce(IndexReader reader, AggTestConfig aggTestConfig) throws IOException { - IndexSearcher searcher = newIndexSearcher(reader); + IndexSearcher searcher = newIndexSearcher(reader, aggTestConfig.builder.supportsParallelCollection()); IndexSettings indexSettings = createIndexSettings(); // First run it to find circuit breaker leaks on the aggregator runWithCrankyCircuitBreaker(indexSettings, searcher, aggTestConfig); @@ -638,14 +626,7 @@ private A searchAndReduce( internalAggs::add, reduceContextSupplier ); - - if (aggTestConfig.builder().supportsParallelCollection()) { - searcher.search(rewritten, aggregatorCollectorManager); - } else { - AggregatorCollector collector = aggregatorCollectorManager.newCollector(); - searcher.search(rewritten, collector); - aggregatorCollectorManager.reduce(List.of(collector)); - } + searcher.search(rewritten, aggregatorCollectorManager); } } finally { Releasables.close(context); @@ -818,7 +799,7 @@ protected void debugTestCase( MappedFieldType... fieldTypes ) throws IOException { // Don't use searchAndReduce because we only want a single aggregator. - IndexSearcher searcher = newIndexSearcher(reader); + IndexSearcher searcher = newIndexSearcher(reader, aggregationBuilder.supportsParallelCollection()); if (queryCachingPolicy != null) { searcher.setQueryCachingPolicy(queryCachingPolicy); } @@ -966,7 +947,7 @@ protected static DirectoryReader wrapInMockESDirectoryReader(DirectoryReader dir /** * Creates a {@link ContextIndexSearcher} that supports concurrency running each segment in a different thread. */ - private IndexSearcher newIndexSearcher(IndexReader indexReader) throws IOException { + private IndexSearcher newIndexSearcher(IndexReader indexReader, boolean supportsParallelCollection) throws IOException { return new ContextIndexSearcher( indexReader, IndexSearcher.getDefaultSimilarity(), @@ -975,7 +956,7 @@ private IndexSearcher newIndexSearcher(IndexReader indexReader) throws IOExcepti indexReader instanceof DirectoryReader ? randomBoolean() : false, // we can only wrap DirectoryReader instances this.threadPoolExecutor, this.threadPoolExecutor.getMaximumPoolSize(), - 1 // forces multiple slices + supportsParallelCollection ? 1 /* forces multiple slices */ : Integer.MAX_VALUE // one slice ); } diff --git a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorTests.java b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorTests.java index 9abd4ca5faa59..cd36af992b0ab 100644 --- a/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorTests.java +++ b/x-pack/plugin/analytics/src/test/java/org/elasticsearch/xpack/analytics/topmetrics/TopMetricsAggregatorTests.java @@ -378,6 +378,7 @@ public void testTonsOfBucketsTriggersBreaker() throws IOException { breaker, builder.bytesToPreallocate(), MultiBucketConsumerService.DEFAULT_MAX_BUCKETS, + true, false, doubleFields() ) From 7025ae92b6b8cf5b99c38d8dcb705710d0c7b5ff Mon Sep 17 00:00:00 2001 From: iverase Date: Fri, 1 Sep 2023 13:29:50 +0800 Subject: [PATCH 05/12] fix test --- .../search/profile/query/InternalProfileCollector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/search/profile/query/InternalProfileCollector.java b/server/src/main/java/org/elasticsearch/search/profile/query/InternalProfileCollector.java index 6db91d16074ca..65a8463afab73 100644 --- a/server/src/main/java/org/elasticsearch/search/profile/query/InternalProfileCollector.java +++ b/server/src/main/java/org/elasticsearch/search/profile/query/InternalProfileCollector.java @@ -63,7 +63,7 @@ protected String deriveCollectorName(Collector c) { // Aggregation collector toString()'s include the user-defined agg name if (getReason().equals(CollectorResult.REASON_AGGREGATION) || getReason().equals(CollectorResult.REASON_AGGREGATION_GLOBAL)) { - s += ": [" + c + "]"; + s += ": " + c; } return s; } From 684ebabb1f98c03f3343731bd86592a3b70eae8c Mon Sep 17 00:00:00 2001 From: iverase Date: Mon, 4 Sep 2023 20:03:43 +0800 Subject: [PATCH 06/12] fix docvalues access from different threads --- .../GlobalOrdinalsStringTermsAggregator.java | 36 +++++++++++-------- .../SignificantTermsAggregatorFactory.java | 2 +- .../StringTermsAggregatorFromFilters.java | 25 +++++++------ .../bucket/terms/TermsAggregatorFactory.java | 6 ++-- .../mr/ItemSetMapReduceValueSource.java | 8 ++--- 5 files changed, 44 insertions(+), 33 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java index 3d00a8d8f10fa..070334d9287d1 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java @@ -14,6 +14,7 @@ import org.apache.lucene.util.ArrayUtil; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.PriorityQueue; +import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.util.LongArray; import org.elasticsearch.common.util.LongHash; @@ -56,8 +57,9 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr protected final ValuesSource.Bytes.WithOrdinals valuesSource; private final LongPredicate acceptedGlobalOrdinals; + + private final CheckedSupplier valuesSupplier; private final long valueCount; - private final GlobalOrdLookupFunction lookupGlobalOrd; protected final CollectionStrategy collectionStrategy; protected int segmentsWithSingleValuedOrds = 0; protected int segmentsWithMultiValuedOrds = 0; @@ -71,7 +73,7 @@ public GlobalOrdinalsStringTermsAggregator( AggregatorFactories factories, Function> resultStrategy, ValuesSource.Bytes.WithOrdinals valuesSource, - SortedSetDocValues values, + CheckedSupplier valuesSupplier, BucketOrder order, DocValueFormat format, BucketCountThresholds bucketCountThresholds, @@ -87,8 +89,8 @@ public GlobalOrdinalsStringTermsAggregator( super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata); this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job. this.valuesSource = valuesSource; - this.valueCount = values.getValueCount(); - this.lookupGlobalOrd = values::lookupOrd; + this.valuesSupplier = valuesSupplier; + this.valueCount = valuesSupplier.get().getValueCount(); this.acceptedGlobalOrdinals = acceptedOrds; if (remapGlobalOrds) { this.collectionStrategy = new RemapGlobalOrds(cardinality); @@ -265,7 +267,7 @@ static class LowCardinality extends GlobalOrdinalsStringTermsAggregator { AggregatorFactories factories, Function> resultStrategy, ValuesSource.Bytes.WithOrdinals valuesSource, - SortedSetDocValues values, + CheckedSupplier valuesSupplier, BucketOrder order, DocValueFormat format, BucketCountThresholds bucketCountThresholds, @@ -281,7 +283,7 @@ static class LowCardinality extends GlobalOrdinalsStringTermsAggregator { factories, resultStrategy, valuesSource, - values, + valuesSupplier, order, format, bucketCountThresholds, @@ -588,6 +590,7 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws B[][] topBucketsPreOrd = buildTopBucketsPerOrd(owningBucketOrds.length); long[] otherDocCount = new long[owningBucketOrds.length]; + GlobalOrdLookupFunction lookupGlobalOrd = valuesSupplier.get()::lookupOrd; for (int ordIdx = 0; ordIdx < owningBucketOrds.length; ordIdx++) { final int size; if (bucketCountThresholds.getMinDocCount() == 0) { @@ -598,7 +601,7 @@ private InternalAggregation[] buildAggregations(long[] owningBucketOrds) throws } PriorityQueue ordered = buildPriorityQueue(size); final int finalOrdIdx = ordIdx; - BucketUpdater updater = bucketUpdater(owningBucketOrds[ordIdx]); + BucketUpdater updater = bucketUpdater(owningBucketOrds[ordIdx], lookupGlobalOrd); collectionStrategy.forEach(owningBucketOrds[ordIdx], new BucketInfoConsumer() { TB spare = null; @@ -618,7 +621,7 @@ public void accept(long globalOrd, long bucketOrd, long docCount) throws IOExcep // Get the top buckets topBucketsPreOrd[ordIdx] = buildBuckets(ordered.size()); for (int i = ordered.size() - 1; i >= 0; --i) { - topBucketsPreOrd[ordIdx][i] = convertTempBucketToRealBucket(ordered.pop()); + topBucketsPreOrd[ordIdx][i] = convertTempBucketToRealBucket(ordered.pop(), lookupGlobalOrd); otherDocCount[ordIdx] -= topBucketsPreOrd[ordIdx][i].getDocCount(); } } @@ -653,7 +656,7 @@ public void accept(long globalOrd, long bucketOrd, long docCount) throws IOExcep * Update fields in {@code spare} to reflect information collected for * this bucket ordinal. */ - abstract BucketUpdater bucketUpdater(long owningBucketOrd) throws IOException; + abstract BucketUpdater bucketUpdater(long owningBucketOrd, GlobalOrdLookupFunction lookupGlobalOrd) throws IOException; /** * Build a {@link PriorityQueue} to sort the buckets. After we've @@ -675,7 +678,7 @@ public void accept(long globalOrd, long bucketOrd, long docCount) throws IOExcep /** * Convert a temporary bucket into a real bucket. */ - abstract B convertTempBucketToRealBucket(TB temp) throws IOException; + abstract B convertTempBucketToRealBucket(TB temp, GlobalOrdLookupFunction lookupGlobalOrd) throws IOException; /** * Build the sub-aggregations into the buckets. This will usually @@ -735,7 +738,7 @@ OrdBucket buildEmptyTemporaryBucket() { } @Override - BucketUpdater bucketUpdater(long owningBucketOrd) throws IOException { + BucketUpdater bucketUpdater(long owningBucketOrd, GlobalOrdLookupFunction lookupGlobalOrd) throws IOException { return (spare, globalOrd, bucketOrd, docCount) -> { spare.globalOrd = globalOrd; spare.bucketOrd = bucketOrd; @@ -748,7 +751,8 @@ PriorityQueue buildPriorityQueue(int size) { return new BucketPriorityQueue<>(size, partiallyBuiltBucketComparator); } - StringTerms.Bucket convertTempBucketToRealBucket(OrdBucket temp) throws IOException { + @Override + StringTerms.Bucket convertTempBucketToRealBucket(OrdBucket temp, GlobalOrdLookupFunction lookupGlobalOrd) throws IOException { BytesRef term = BytesRef.deepCopyOf(lookupGlobalOrd.apply(temp.globalOrd)); StringTerms.Bucket result = new StringTerms.Bucket(term, temp.docCount, null, showTermDocCountError, 0, format); result.bucketOrd = temp.bucketOrd; @@ -871,7 +875,8 @@ private long subsetSize(long owningBucketOrd) { } @Override - BucketUpdater bucketUpdater(long owningBucketOrd) throws IOException { + BucketUpdater bucketUpdater(long owningBucketOrd, GlobalOrdLookupFunction lookupGlobalOrd) + throws IOException { long subsetSize = subsetSize(owningBucketOrd); return (spare, globalOrd, bucketOrd, docCount) -> { spare.bucketOrd = bucketOrd; @@ -895,7 +900,10 @@ PriorityQueue buildPriorityQueue(int size) { } @Override - SignificantStringTerms.Bucket convertTempBucketToRealBucket(SignificantStringTerms.Bucket temp) throws IOException { + SignificantStringTerms.Bucket convertTempBucketToRealBucket( + SignificantStringTerms.Bucket temp, + GlobalOrdLookupFunction lookupGlobalOrd + ) throws IOException { return temp; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java index d8a3e9097dc5c..2cadbd3d43494 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/SignificantTermsAggregatorFactory.java @@ -371,7 +371,7 @@ Aggregator create( factories, a -> a.new SignificantTermsResults(lookup, significanceHeuristic, cardinality), ordinalsValuesSource, - values, + () -> TermsAggregatorFactory.globalOrdsValues(context, ordinalsValuesSource), null, format, bucketCountThresholds, diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregatorFromFilters.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregatorFromFilters.java index 9f5c2b0136155..0e0db3ab5054f 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregatorFromFilters.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/StringTermsAggregatorFromFilters.java @@ -12,6 +12,7 @@ import org.apache.lucene.index.TermsEnum; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.PriorityQueue; +import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.index.query.TermQueryBuilder; import org.elasticsearch.search.DocValueFormat; @@ -62,12 +63,11 @@ static StringTermsAggregatorFromFilters adaptIntoFiltersOrNull( BucketOrder order, BucketCountThresholds bucketCountThresholds, LongPredicate acceptedOrds, - SortedSetDocValues values + CheckedSupplier valuesSupplier ) throws IOException { if (false == valuesSourceConfig.alignesWithSearchIndex()) { return null; } - TermsEnum terms = values.termsEnum(); FilterByFilterAggregator.AdapterBuilder filterByFilterBuilder = new FilterByFilterAggregator.AdapterBuilder( name, @@ -91,10 +91,12 @@ protected StringTermsAggregatorFromFilters adapt( valuesSourceConfig.format(), order, bucketCountThresholds, - terms + valuesSupplier ); } }; + SortedSetDocValues values = valuesSupplier.get(); + TermsEnum terms = values.termsEnum(); String field = valuesSourceConfig.fieldContext().field(); for (long ord = 0; ord < values.getValueCount(); ord++) { if (acceptedOrds.test(ord) == false) { @@ -119,7 +121,7 @@ protected StringTermsAggregatorFromFilters adapt( private final DocValueFormat format; private final BucketOrder order; private final BucketCountThresholds bucketCountThresholds; - private final TermsEnum terms; + private final CheckedSupplier valuesSupplier; public StringTermsAggregatorFromFilters( Aggregator parent, @@ -129,14 +131,14 @@ public StringTermsAggregatorFromFilters( DocValueFormat format, BucketOrder order, BucketCountThresholds bucketCountThresholds, - TermsEnum terms + CheckedSupplier valuesSupplier ) throws IOException { super(parent, subAggregators, delegate); this.showTermDocCountError = showTermDocCountError; this.format = format; this.order = order; this.bucketCountThresholds = bucketCountThresholds; - this.terms = terms; + this.valuesSupplier = valuesSupplier; } @Override @@ -160,6 +162,7 @@ protected InternalAggregation adapt(InternalAggregation delegateResult) throws I if (minDocCount == 0 && bucketCountThresholds.getMinDocCount() > 0) { minDocCount = 1; } + TermsEnum terms = valuesSupplier.get().termsEnum(); if (filters.getBuckets().size() > bucketCountThresholds.getShardSize()) { PriorityQueue queue = new PriorityQueue(bucketCountThresholds.getShardSize()) { private final Comparator comparator = order.comparator(); @@ -190,7 +193,7 @@ protected boolean lessThan(OrdBucket a, OrdBucket b) { buckets = new ArrayList<>(queue.size()); if (isKeyOrder(order) == false) { for (OrdBucket b : queue) { - buckets.add(buildBucket(b)); + buckets.add(buildBucket(b, terms)); } Collections.sort(buckets, reduceOrder.comparator()); } else { @@ -201,7 +204,7 @@ protected boolean lessThan(OrdBucket a, OrdBucket b) { * instead of O(n), but such is life. And n shouldn't be too big. */ while (queue.size() > 0) { - buckets.add(buildBucket(queue.pop())); + buckets.add(buildBucket(queue.pop(), terms)); } // The buckets come off last to first so we need to flip them. Collections.reverse(buckets); @@ -212,7 +215,7 @@ protected boolean lessThan(OrdBucket a, OrdBucket b) { if (b.getDocCount() < minDocCount) { continue; } - buckets.add(buildBucket(b)); + buckets.add(buildBucket(b, terms)); } Collections.sort(buckets, reduceOrder.comparator()); } @@ -232,12 +235,12 @@ protected boolean lessThan(OrdBucket a, OrdBucket b) { ); } - private StringTerms.Bucket buildBucket(OrdBucket b) throws IOException { + private StringTerms.Bucket buildBucket(OrdBucket b, TermsEnum terms) throws IOException { terms.seekExact(b.globalOrd); return new StringTerms.Bucket(BytesRef.deepCopyOf(terms.term()), b.getDocCount(), b.aggregations, showTermDocCountError, 0, format); } - private StringTerms.Bucket buildBucket(InternalFilters.InternalBucket b) throws IOException { + private StringTerms.Bucket buildBucket(InternalFilters.InternalBucket b, TermsEnum terms) throws IOException { terms.seekExact(Long.parseLong(b.getKey())); return new StringTerms.Bucket( BytesRef.deepCopyOf(terms.term()), diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java index 2bc9820a52e7a..e13ee43df5746 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/TermsAggregatorFactory.java @@ -439,7 +439,7 @@ Aggregator create( order, bucketCountThresholds, gloabalOrdsFilter(includeExclude, valuesSourceConfig.format(), values), - values + () -> globalOrdsValues(context, ordinalsValuesSource) ); if (adapted != null) { /* @@ -485,7 +485,7 @@ Aggregator create( factories, a -> a.new StandardTermsResults(), ordinalsValuesSource, - values, + () -> globalOrdsValues(context, ordinalsValuesSource), order, valuesSourceConfig.format(), bucketCountThresholds, @@ -529,7 +529,7 @@ Aggregator create( factories, a -> a.new StandardTermsResults(), ordinalsValuesSource, - values, + () -> globalOrdsValues(context, ordinalsValuesSource), order, valuesSourceConfig.format(), bucketCountThresholds, diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/mr/ItemSetMapReduceValueSource.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/mr/ItemSetMapReduceValueSource.java index 24e8da132ca46..8a7d2afa958d9 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/mr/ItemSetMapReduceValueSource.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/aggs/frequentitemsets/mr/ItemSetMapReduceValueSource.java @@ -183,7 +183,7 @@ static class GlobalOrdinalsStrategy implements ExecutionStrategy { private final Field field; private final Bytes.WithOrdinals source; - private final SortedSetDocValues docValues; + private SortedSetDocValues docValues; private final LongBitSet bitSetFilter; GlobalOrdinalsStrategy( @@ -198,15 +198,15 @@ static class GlobalOrdinalsStrategy implements ExecutionStrategy { bitSetFilter = globalOrdinalsFilter != null ? globalOrdinalsFilter.acceptedGlobalOrdinals(source.globalOrdinalsValues(ctx)) : null; - this.docValues = source.globalOrdinalsValues(ctx); } @Override public ValueCollector getValueCollector(LeafReaderContext ctx) throws IOException { - final SortedSetDocValues values = source.globalOrdinalsValues(ctx); + this.docValues = source.globalOrdinalsValues(ctx); + ; final Tuple> empty = new Tuple<>(field, Collections.emptyList()); - + final SortedSetDocValues values = this.docValues; return doc -> { if (values.advanceExact(doc)) { int valuesCount = values.docValueCount(); From 41954861c3d9f37e0149ceea99a263b3a3d5c489 Mon Sep 17 00:00:00 2001 From: iverase Date: Tue, 5 Sep 2023 07:16:37 +0800 Subject: [PATCH 07/12] for partial reduction --- .../org/elasticsearch/search/aggregations/AggregationPhase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java index de2e1143d9786..ce810507f3082 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/AggregationPhase.java @@ -41,7 +41,7 @@ public static void preProcess(SearchContext context) { new AggregatorCollectorManager( collectorSupplier, internalAggregations -> context.queryResult().aggregations(internalAggregations), - () -> context.aggregations().getAggregationReduceContextBuilder().forFinalReduction() + () -> context.aggregations().getAggregationReduceContextBuilder().forPartialReduction() ) ); } From 5ac1b3057cff072e2d82e932697215ea022be7bb Mon Sep 17 00:00:00 2001 From: iverase Date: Tue, 5 Sep 2023 13:12:17 +0800 Subject: [PATCH 08/12] remove `@LuceneTestCase.SuppressCodecs("*")` --- .../src/main/java/org/elasticsearch/test/ESIntegTestCase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 60102c0563aa4..7711164eebf75 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -252,7 +252,6 @@ *
  • - a random seed used to initialize the index random context. * */ -@LuceneTestCase.SuppressCodecs("*") // remove this once we have asserting codec that works with ES @LuceneTestCase.SuppressFileSystems("ExtrasFS") // doesn't work with potential multi data path from test cluster yet public abstract class ESIntegTestCase extends ESTestCase { From 9527a8cd9a600f8f30e9501c4d2d156dfb37890e Mon Sep 17 00:00:00 2001 From: iverase Date: Tue, 12 Sep 2023 12:01:00 +0200 Subject: [PATCH 09/12] disable timeouts per thread --- .../search/internal/ContextIndexSearcher.java | 16 +++++- .../search/query/QueryPhase.java | 50 ++++++++----------- 2 files changed, 35 insertions(+), 31 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java index d5af8e994dfb3..863e50b792441 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java @@ -54,10 +54,12 @@ import java.util.Comparator; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Future; @@ -91,6 +93,7 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable { // don't create slices with less than this number of docs private final int minimumDocsPerSlice; + private final Map timeoutOverwrites = new ConcurrentHashMap<>(); private volatile boolean timeExceeded = false; /** constructor for non-concurrent search */ @@ -489,7 +492,14 @@ public void search(List leaves, Weight weight, Collector coll // otherwise the state of the aggregation might be undefined and running post collection // might result in an exception if (success || timeExceeded) { - doAggregationPostCollection(collector); + try { + // Search phase has finished, no longer need to check for timeout + // otherwise the aggregation post-collection phase might get cancelled. + timeoutOverwrites.put(Thread.currentThread(), true); + doAggregationPostCollection(collector); + } finally { + timeoutOverwrites.remove(Thread.currentThread()); + } } } } @@ -506,7 +516,9 @@ public boolean timeExceeded() { } public void throwTimeExceededException() { - throw new TimeExceededException(); + if (timeoutOverwrites.getOrDefault(Thread.currentThread(), false) == false) { + throw new TimeExceededException(); + } } private static class TimeExceededException extends RuntimeException { diff --git a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java index 1761855fb5a2f..3044d15ab8552 100644 --- a/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/elasticsearch/search/query/QueryPhase.java @@ -201,36 +201,28 @@ static void addCollectorsAndSearch(SearchContext searchContext) throws QueryPhas searcher.addQueryCancellation(timeoutRunnable); } - try { - QueryPhaseResult queryPhaseResult = searcher.search(query, collectorManager); - if (searchContext.getProfilers() != null) { - searchContext.getProfilers().getCurrentQueryProfiler().setCollectorResult(queryPhaseResult.collectorResult()); - } - queryResult.topDocs(queryPhaseResult.topDocsAndMaxScore(), queryPhaseResult.sortValueFormats()); - if (searcher.timeExceeded()) { - assert timeoutRunnable != null : "TimeExceededException thrown even though timeout wasn't set"; - if (searchContext.request().allowPartialSearchResults() == false) { - throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded"); - } - queryResult.searchTimedOut(true); - } - if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) { - queryResult.terminatedEarly(queryPhaseResult.terminatedAfter()); - } - ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH); - assert executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor - || (executor instanceof EsThreadPoolExecutor == false /* in case thread pool is mocked out in tests */) - : "SEARCH threadpool should have an executor that exposes EWMA metrics, but is of type " + executor.getClass(); - if (executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor rExecutor) { - queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize()); - queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA()); - } - } finally { - // Search phase has finished, no longer need to check for timeout - // otherwise aggregation phase might get cancelled. - if (timeoutRunnable != null) { - searcher.removeQueryCancellation(timeoutRunnable); + QueryPhaseResult queryPhaseResult = searcher.search(query, collectorManager); + if (searchContext.getProfilers() != null) { + searchContext.getProfilers().getCurrentQueryProfiler().setCollectorResult(queryPhaseResult.collectorResult()); + } + queryResult.topDocs(queryPhaseResult.topDocsAndMaxScore(), queryPhaseResult.sortValueFormats()); + if (searcher.timeExceeded()) { + assert timeoutRunnable != null : "TimeExceededException thrown even though timeout wasn't set"; + if (searchContext.request().allowPartialSearchResults() == false) { + throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded"); } + queryResult.searchTimedOut(true); + } + if (searchContext.terminateAfter() != SearchContext.DEFAULT_TERMINATE_AFTER) { + queryResult.terminatedEarly(queryPhaseResult.terminatedAfter()); + } + ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH); + assert executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor + || (executor instanceof EsThreadPoolExecutor == false /* in case thread pool is mocked out in tests */) + : "SEARCH threadpool should have an executor that exposes EWMA metrics, but is of type " + executor.getClass(); + if (executor instanceof TaskExecutionTimeTrackingEsThreadPoolExecutor rExecutor) { + queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize()); + queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA()); } } catch (Exception e) { throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Failed to execute main query", e); From 3f233abd1e7bcf51bf90d97926f249b29da933a1 Mon Sep 17 00:00:00 2001 From: iverase Date: Mon, 18 Sep 2023 11:26:48 +0200 Subject: [PATCH 10/12] address review comments --- .../aggregations/support/AggregationContext.java | 2 ++ .../search/internal/ContextIndexSearcher.java | 13 +++++++------ 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java index b69b0cd70f118..822dd6d983e5c 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/support/AggregationContext.java @@ -553,6 +553,8 @@ public void addReleasable(Aggregator aggregator) { @Override public synchronized void removeReleasable(Aggregator aggregator) { + // Removing an aggregator is done after calling Aggregator#buildTopLevel which happens on an executor thread. + // We need to synchronize the removal because he AggregatorContext it is shared between executor threads. assert releaseMe.contains(aggregator) : "removing non-existing aggregator [" + aggregator.name() + "] from the the aggregation context"; releaseMe.remove(aggregator); diff --git a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java index 863e50b792441..f2c688f0b952f 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java @@ -37,6 +37,7 @@ import org.apache.lucene.util.Bits; import org.apache.lucene.util.SparseFixedBitSet; import org.apache.lucene.util.ThreadInterruptedException; +import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.core.Releasable; import org.elasticsearch.lucene.util.CombinedBitSet; @@ -54,12 +55,10 @@ import java.util.Comparator; import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.CancellationException; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.Future; @@ -93,7 +92,7 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable { // don't create slices with less than this number of docs private final int minimumDocsPerSlice; - private final Map timeoutOverwrites = new ConcurrentHashMap<>(); + private final Set timeoutOverwrites = ConcurrentCollections.newConcurrentSet(); private volatile boolean timeExceeded = false; /** constructor for non-concurrent search */ @@ -495,10 +494,12 @@ public void search(List leaves, Weight weight, Collector coll try { // Search phase has finished, no longer need to check for timeout // otherwise the aggregation post-collection phase might get cancelled. - timeoutOverwrites.put(Thread.currentThread(), true); + boolean added = timeoutOverwrites.add(Thread.currentThread()); + assert added; doAggregationPostCollection(collector); } finally { - timeoutOverwrites.remove(Thread.currentThread()); + boolean removed = timeoutOverwrites.remove(Thread.currentThread()); + assert removed; } } } @@ -516,7 +517,7 @@ public boolean timeExceeded() { } public void throwTimeExceededException() { - if (timeoutOverwrites.getOrDefault(Thread.currentThread(), false) == false) { + if (timeoutOverwrites.contains(Thread.currentThread())) { throw new TimeExceededException(); } } From 588cee0502bae1bcf9620b5e7fa768acd63aa600 Mon Sep 17 00:00:00 2001 From: iverase Date: Mon, 18 Sep 2023 11:44:32 +0200 Subject: [PATCH 11/12] address review comments --- .../elasticsearch/search/aggregations/AggregatorTestCase.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java index 58b66f63441a3..d19fd4ecf08a3 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/search/aggregations/AggregatorTestCase.java @@ -648,7 +648,9 @@ private A searchAndReduce( ); internalAggs = new ArrayList<>(internalAggs.subList(r, toReduceSize)); internalAggs.add(InternalAggregations.topLevelReduce(toReduce, reduceContext)); - // assertRoundTrip(internalAggs); + for (InternalAggregations internalAggregation : internalAggs) { + assertRoundTrip(internalAggregation.copyResults()); + } } // now do the final reduce From 4527c13c940017c6e687b4f8452aa83e1a9614f8 Mon Sep 17 00:00:00 2001 From: iverase Date: Mon, 18 Sep 2023 12:30:41 +0200 Subject: [PATCH 12/12] doh --- .../org/elasticsearch/search/internal/ContextIndexSearcher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java index f2c688f0b952f..94bc5a4187435 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ContextIndexSearcher.java @@ -517,7 +517,7 @@ public boolean timeExceeded() { } public void throwTimeExceededException() { - if (timeoutOverwrites.contains(Thread.currentThread())) { + if (timeoutOverwrites.contains(Thread.currentThread()) == false) { throw new TimeExceededException(); } }