Skip to content

Commit

Permalink
CardinalityIT/NestedIT test failures with concurrent search enabled a…
Browse files Browse the repository at this point in the history
…nd AssertingCodec (#8303)

* CardinalityIT/NestedIT test failures with concurrent search enabled and AssertingCodec
The tests were failing because during the concurrent segment search for each slice the
codec producers for the leafs were initialized by the slice thread. Later in reduce phase,
the post collection happens over those codec producers on the search thread.
With AssertingCodec it verifies that all access is done by the same thread causing the failures

Signed-off-by: Sorabh Hamirwasia <[email protected]>

* Address review comments

Signed-off-by: Sorabh Hamirwasia <[email protected]>

---------

Signed-off-by: Sorabh Hamirwasia <[email protected]>
  • Loading branch information
sohami authored Jul 13, 2023
1 parent 68ddf24 commit 064f265
Show file tree
Hide file tree
Showing 16 changed files with 192 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add safeguard limits for file cache during node level allocation ([#8208](https://github.com/opensearch-project/OpenSearch/pull/8208))
- Move span actions to Scope ([#8411](https://github.com/opensearch-project/OpenSearch/pull/8411))
- Add wrapper tracer implementation ([#8565](https://github.com/opensearch-project/OpenSearch/pull/8565))
- Perform aggregation postCollection in ContextIndexSearcher after searching leaves ([#8303](https://github.com/opensearch-project/OpenSearch/pull/8303))

### Deprecated

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.opensearch.index.search.NestedHelper;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.search.aggregations.BucketCollectorProcessor;
import org.opensearch.search.aggregations.InternalAggregation;
import org.opensearch.search.aggregations.SearchContextAggregations;
import org.opensearch.search.builder.SearchSourceBuilder;
Expand Down Expand Up @@ -176,7 +177,7 @@ final class DefaultSearchContext extends SearchContext {
private SuggestionSearchContext suggest;
private List<RescoreContext> rescore;
private Profilers profilers;

private BucketCollectorProcessor bucketCollectorProcessor = NO_OP_BUCKET_COLLECTOR_PROCESSOR;
private final Map<String, SearchExtBuilder> searchExtBuilders = new HashMap<>();
private final Map<Class<?>, CollectorManager<? extends Collector, ReduceableSearchResult>> queryCollectorManagers = new HashMap<>();
private final QueryShardContext queryShardContext;
Expand Down Expand Up @@ -919,4 +920,14 @@ public ReaderContext readerContext() {
public InternalAggregation.ReduceContext partial() {
return requestToAggReduceContextBuilder.apply(request.source()).forPartialReduction();
}

@Override
public void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollectorProcessor) {
this.bucketCollectorProcessor = bucketCollectorProcessor;
}

@Override
public BucketCollectorProcessor bucketCollectorProcessor() {
return bucketCollectorProcessor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;

/**
Expand Down Expand Up @@ -53,31 +50,12 @@ public Collector newCollector() throws IOException {

@Override
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
List<Aggregator> aggregators = new ArrayList<>();

final Deque<Collector> allCollectors = new LinkedList<>(collectors);
while (!allCollectors.isEmpty()) {
final Collector currentCollector = allCollectors.pop();
if (currentCollector instanceof Aggregator) {
aggregators.add((Aggregator) currentCollector);
} else if (currentCollector instanceof InternalProfileCollector) {
if (((InternalProfileCollector) currentCollector).getCollector() instanceof Aggregator) {
aggregators.add((Aggregator) ((InternalProfileCollector) currentCollector).getCollector());
} else if (((InternalProfileCollector) currentCollector).getCollector() instanceof MultiBucketCollector) {
allCollectors.addAll(
Arrays.asList(((MultiBucketCollector) ((InternalProfileCollector) currentCollector).getCollector()).getCollectors())
);
}
} else if (currentCollector instanceof MultiBucketCollector) {
allCollectors.addAll(Arrays.asList(((MultiBucketCollector) currentCollector).getCollectors()));
}
}

final List<Aggregator> aggregators = context.bucketCollectorProcessor().toAggregators(collectors);
final List<InternalAggregation> internals = new ArrayList<>(aggregators.size());
context.aggregations().resetBucketMultiConsumer();
for (Aggregator aggregator : aggregators) {
try {
aggregator.postCollection();
// post collection is called in ContextIndexSearcher after search on leaves are completed
internals.add(aggregator.buildTopLevel());
} catch (IOException e) {
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations;

import org.apache.lucene.search.Collector;
import org.apache.lucene.search.MultiCollector;
import org.opensearch.common.lucene.MinimumScoreCollector;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.query.InternalProfileCollector;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;

/**
* Processor to perform collector level processing specific to {@link BucketCollector} in different stages like: a) PostCollection
* after search on each leaf is completed and b) process the collectors to perform reduce after collection is completed
*/
public class BucketCollectorProcessor {

/**
* Performs {@link BucketCollector#postCollection()} on all the {@link BucketCollector} in the given {@link Collector} collector tree
* after the collection of documents on a leaf is completed. This method will be called by different slice threads on its own collector
* tree instance in case of concurrent segment search such that postCollection happens on the same slice thread which initialize and
* perform collection of the documents for a leaf segment. For sequential search case, there is always a single search thread which
* performs both collection and postCollection on {@link BucketCollector}.
* <p>
* This was originally done in {@link org.opensearch.search.aggregations.AggregationProcessor#postProcess(SearchContext)}. But with
* concurrent segment search path this needs to be performed here. There are AssertingCodecs in lucene which validates that the
* DocValues created for a field is always used by the same thread for a request. In concurrent segment search case, the DocValues
* gets initialized on different threads for different segments (or slices). Whereas the postProcess happens as part of reduce phase
* and is performed on the separate thread which is from search threadpool and not from slice threadpool. So two different threads
* performs the access on the DocValues causing the AssertingCodec to fail. From functionality perspective, there is no issue as
* DocValues for each segment is always accessed by a single thread at a time but those threads may be different (e.g. slice thread
* during collection and then search thread during reduce)
* </p>
* <p>
* NOTE: We can evaluate and deprecate this postCollection processing once lucene release the changes described in the
* <a href="https://github.com/apache/lucene/issues/12375">issue-12375</a>. With this new change we should be able to implement
* {@link BucketCollector#postCollection()} functionality using the lucene interface directly such that postCollection gets called
* from the slice thread by lucene itself
* </p>
* @param collectorTree collector tree used by calling thread
*/
public void processPostCollection(Collector collectorTree) throws IOException {
final Queue<Collector> collectors = new LinkedList<>();
collectors.offer(collectorTree);
while (!collectors.isEmpty()) {
Collector currentCollector = collectors.poll();
if (currentCollector instanceof InternalProfileCollector) {
collectors.offer(((InternalProfileCollector) currentCollector).getCollector());
} else if (currentCollector instanceof MinimumScoreCollector) {
collectors.offer(((MinimumScoreCollector) currentCollector).getCollector());
} else if (currentCollector instanceof MultiCollector) {
for (Collector innerCollector : ((MultiCollector) currentCollector).getCollectors()) {
collectors.offer(innerCollector);
}
} else if (currentCollector instanceof BucketCollector) {
((BucketCollector) currentCollector).postCollection();
}
}
}

/**
* Unwraps the input collection of {@link Collector} to get the list of the {@link Aggregator} used by different slice threads. The
* input is expected to contain the collectors related to Aggregations only as that is passed to {@link AggregationCollectorManager}
* during the reduce phase. This list of {@link Aggregator} is used to create {@link InternalAggregation} and optionally perform
* reduce at shard level before returning response to coordinator
* @param collectors collection of aggregation collectors to reduce
* @return list of unwrapped {@link Aggregator}
*/
public List<Aggregator> toAggregators(Collection<Collector> collectors) {
List<Aggregator> aggregators = new ArrayList<>();

final Deque<Collector> allCollectors = new LinkedList<>(collectors);
while (!allCollectors.isEmpty()) {
final Collector currentCollector = allCollectors.pop();
if (currentCollector instanceof Aggregator) {
aggregators.add((Aggregator) currentCollector);
} else if (currentCollector instanceof InternalProfileCollector) {
if (((InternalProfileCollector) currentCollector).getCollector() instanceof Aggregator) {
aggregators.add((Aggregator) ((InternalProfileCollector) currentCollector).getCollector());
} else if (((InternalProfileCollector) currentCollector).getCollector() instanceof MultiBucketCollector) {
allCollectors.addAll(
Arrays.asList(((MultiBucketCollector) ((InternalProfileCollector) currentCollector).getCollector()).getCollectors())
);
}
} else if (currentCollector instanceof MultiBucketCollector) {
allCollectors.addAll(Arrays.asList(((MultiBucketCollector) currentCollector).getCollectors()));
}
}
return aggregators;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,16 @@
* avoid the increase in aggregation result sets returned by each shard to coordinator where final reduce happens for results received from
* all the shards
*/
public class ConcurrentAggregationProcessor extends DefaultAggregationProcessor {
public class ConcurrentAggregationProcessor implements AggregationProcessor {

private final BucketCollectorProcessor bucketCollectorProcessor = new BucketCollectorProcessor();

@Override
public void preProcess(SearchContext context) {
try {
if (context.aggregations() != null) {
// update the bucket collector process as there is aggregation in the request
context.setBucketCollectorProcessor(bucketCollectorProcessor);
if (context.aggregations().factories().hasNonGlobalAggregator()) {
context.queryCollectorManagers().put(NonGlobalAggCollectorManager.class, new NonGlobalAggCollectorManager(context));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@
*/
public class DefaultAggregationProcessor implements AggregationProcessor {

private final BucketCollectorProcessor bucketCollectorProcessor = new BucketCollectorProcessor();

@Override
public void preProcess(SearchContext context) {
try {
if (context.aggregations() != null) {
// update the bucket collector process as there is aggregation in the request
context.setBucketCollectorProcessor(bucketCollectorProcessor);
if (context.aggregations().factories().hasNonGlobalAggregator()) {
context.queryCollectorManagers()
.put(NonGlobalAggCollectorManager.class, new NonGlobalAggCollectorManagerWithSingleCollector(context));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ protected void search(List<LeafReaderContext> leaves, Weight weight, Collector c
searchLeaf(leaves.get(i), weight, collector);
}
}
searchContext.bucketCollectorProcessor().processPostCollection(collector);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.search.SearchExtBuilder;
import org.opensearch.search.SearchShardTarget;
import org.opensearch.search.aggregations.BucketCollectorProcessor;
import org.opensearch.search.aggregations.InternalAggregation;
import org.opensearch.search.aggregations.SearchContextAggregations;
import org.opensearch.search.collapse.CollapseContext;
Expand Down Expand Up @@ -548,4 +549,14 @@ public ReaderContext readerContext() {
public InternalAggregation.ReduceContext partial() {
return in.partial();
}

@Override
public void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollectorProcessor) {
in.setBucketCollectorProcessor(bucketCollectorProcessor);
}

@Override
public BucketCollectorProcessor bucketCollectorProcessor() {
return in.bucketCollectorProcessor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@
import org.opensearch.search.RescoreDocIds;
import org.opensearch.search.SearchExtBuilder;
import org.opensearch.search.SearchShardTarget;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.BucketCollectorProcessor;
import org.opensearch.search.aggregations.InternalAggregation;
import org.opensearch.search.aggregations.SearchContextAggregations;
import org.opensearch.search.collapse.CollapseContext;
Expand All @@ -73,6 +75,7 @@
import org.opensearch.search.sort.SortAndFormats;
import org.opensearch.search.suggest.SuggestionSearchContext;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -94,6 +97,20 @@ public abstract class SearchContext implements Releasable {
public static final int TRACK_TOTAL_HITS_DISABLED = -1;
public static final int DEFAULT_TRACK_TOTAL_HITS_UP_TO = 10000;

// no-op bucket collector processor
public static final BucketCollectorProcessor NO_OP_BUCKET_COLLECTOR_PROCESSOR = new BucketCollectorProcessor() {
@Override
public void processPostCollection(Collector collectorTree) {
// do nothing as there is no aggregation collector
}

@Override
public List<Aggregator> toAggregators(Collection<Collector> collectors) {
// should not be called when there is no aggregation collector
throw new IllegalStateException("Unexpected toAggregators call on NO_OP_BUCKET_COLLECTOR_PROCESSOR");
}
};

private final List<Releasable> releasables = new CopyOnWriteArrayList<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
private InnerHitsContext innerHitsContext;
Expand Down Expand Up @@ -449,4 +466,9 @@ public String toString() {
public abstract ReaderContext readerContext();

public abstract InternalAggregation.ReduceContext partial();

// processor used for bucket collectors
public abstract void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollectorProcessor);

public abstract BucketCollectorProcessor bucketCollectorProcessor();
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.lucene.tests.util.TestUtil;
import org.apache.lucene.util.automaton.CompiledAutomaton;
import org.apache.lucene.util.automaton.RegExp;
import org.junit.Before;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.search.internal.ContextIndexSearcher;
Expand All @@ -71,6 +72,7 @@ public class SearchCancellationTests extends OpenSearchTestCase {

private static Directory dir;
private static IndexReader reader;
private SearchContext searchContext;

@BeforeClass
public static void setup() throws IOException {
Expand Down Expand Up @@ -106,6 +108,12 @@ public static void cleanup() throws IOException {
reader = null;
}

@Before
public void testSetup() {
searchContext = mock(SearchContext.class);
when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR);
}

public void testAddingCancellationActions() throws IOException {
ContextIndexSearcher searcher = new ContextIndexSearcher(
reader,
Expand All @@ -114,7 +122,7 @@ public void testAddingCancellationActions() throws IOException {
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
null,
mock(SearchContext.class)
searchContext
);
NullPointerException npe = expectThrows(NullPointerException.class, () -> searcher.addQueryCancellation(null));
assertEquals("cancellation runnable should not be null", npe.getMessage());
Expand All @@ -128,7 +136,6 @@ public void testAddingCancellationActions() throws IOException {
public void testCancellableCollector() throws IOException {
TotalHitCountCollector collector1 = new TotalHitCountCollector();
Runnable cancellation = () -> { throw new TaskCancelledException("cancelled"); };
SearchContext searchContext = mock(SearchContext.class);
IndexShard indexShard = mock(IndexShard.class);
when(searchContext.indexShard()).thenReturn(indexShard);
ContextIndexSearcher searcher = new ContextIndexSearcher(
Expand Down Expand Up @@ -167,7 +174,7 @@ public void testExitableDirectoryReader() throws IOException {
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
null,
mock(SearchContext.class)
searchContext
);
searcher.addQueryCancellation(cancellation);
CompiledAutomaton automaton = new CompiledAutomaton(new RegExp("a.*").toAutomaton());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ public void onRemoval(ShardId shardId, Accountable accountable) {
SearchContext searchContext = mock(SearchContext.class);
IndexShard indexShard = mock(IndexShard.class);
when(searchContext.indexShard()).thenReturn(indexShard);
when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR);
ContextIndexSearcher searcher = new ContextIndexSearcher(
filteredReader,
IndexSearcher.getDefaultSimilarity(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public void setUp() throws Exception {
SearchContext searchContext = mock(SearchContext.class);
IndexShard indexShard = mock(IndexShard.class);
when(searchContext.indexShard()).thenReturn(indexShard);
when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR);
searcher = new ContextIndexSearcher(
reader,
IndexSearcher.getDefaultSimilarity(),
Expand Down
Loading

0 comments on commit 064f265

Please sign in to comment.