Skip to content

Commit

Permalink
Add early termination support for concurrent segment search (opensear…
Browse files Browse the repository at this point in the history
…ch-project#8306)

Signed-off-by: Jay Deng <[email protected]>
  • Loading branch information
jed326 authored and raghuvanshraj committed Jul 12, 2023
1 parent 6417fee commit bbb4e51
Show file tree
Hide file tree
Showing 14 changed files with 188 additions and 49 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151))
- Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854))
- Add support for ignoring missing Javadoc on generated code using annotation ([#7604](https://github.com/opensearch-project/OpenSearch/pull/7604))
- Add partial results support for concurrent segment search ([#8306](https://github.com/opensearch-project/OpenSearch/pull/8306))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search;

import org.opensearch.common.settings.FeatureFlagSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;

public class ConcurrentSegmentSearchCancellationIT extends SearchCancellationIT {
@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
for (Setting builtInFlag : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) {
featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY));
}
featureSettings.put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true");
return featureSettings.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search;

import org.opensearch.common.settings.FeatureFlagSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;

public class ConcurrentSegmentSearchTimeoutIT extends SearchTimeoutIT {

@Override
protected Settings featureFlagSettings() {
Settings.Builder featureSettings = Settings.builder();
for (Setting builtInFlag : FeatureFlagSettings.BUILT_IN_FEATURE_FLAGS) {
featureSettings.put(builtInFlag.getKey(), builtInFlag.getDefaultRaw(Settings.EMPTY));
}
featureSettings.put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true");
return featureSettings.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import static org.opensearch.action.support.WriteRequest.RefreshPolicy.IMMEDIATE;
import static org.opensearch.index.query.QueryBuilders.scriptQuery;
import static org.opensearch.search.SearchTimeoutIT.ScriptedTimeoutPlugin.SCRIPT_NAME;
import static org.hamcrest.Matchers.equalTo;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE)
public class SearchTimeoutIT extends OpenSearchIntegTestCase {
Expand All @@ -67,17 +66,37 @@ protected Settings nodeSettings(int nodeOrdinal) {
}

public void testSimpleTimeout() throws Exception {
for (int i = 0; i < 32; i++) {
final int numDocs = 1000;
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value").get();
}
refresh("test");

SearchResponse searchResponse = client().prepareSearch("test")
.setTimeout(new TimeValue(10, TimeUnit.MILLISECONDS))
.setTimeout(new TimeValue(5, TimeUnit.MILLISECONDS))
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())))
.setAllowPartialSearchResults(true)
.get();
assertThat(searchResponse.isTimedOut(), equalTo(true));
assertTrue(searchResponse.isTimedOut());
assertEquals(0, searchResponse.getFailedShards());
assertTrue(numDocs > searchResponse.getHits().getTotalHits().value);
}

public void testSimpleDoesNotTimeout() throws Exception {
final int numDocs = 10;
for (int i = 0; i < numDocs; i++) {
client().prepareIndex("test").setId(Integer.toString(i)).setSource("field", "value").get();
}
refresh("test");

SearchResponse searchResponse = client().prepareSearch("test")
.setTimeout(new TimeValue(10000, TimeUnit.SECONDS))
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap())))
.setAllowPartialSearchResults(true)
.get();
assertFalse(searchResponse.isTimedOut());
assertEquals(0, searchResponse.getFailedShards());
assertEquals(numDocs, searchResponse.getHits().getTotalHits().value);
}

public void testPartialResultsIntolerantTimeout() throws Exception {
Expand All @@ -91,7 +110,7 @@ public void testPartialResultsIntolerantTimeout() throws Exception {
.setAllowPartialSearchResults(false) // this line causes timeouts to report failures
.get()
);
assertTrue(ex.toString().contains("Time exceeded"));
assertTrue(ex.toString().contains("QueryPhaseExecutionException[Time exceeded]"));
}

public static class ScriptedTimeoutPlugin extends MockScriptPlugin {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.opensearch.search.profile.query.ProfileWeight;
import org.opensearch.search.profile.query.QueryProfiler;
import org.opensearch.search.profile.query.QueryTimingType;
import org.opensearch.search.query.QueryPhase;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.search.sort.FieldSortBuilder;
import org.opensearch.search.sort.MinAndMax;
Expand Down Expand Up @@ -103,26 +104,6 @@ public class ContextIndexSearcher extends IndexSearcher implements Releasable {
private MutableQueryTimeout cancellable;
private SearchContext searchContext;

public ContextIndexSearcher(
IndexReader reader,
Similarity similarity,
QueryCache queryCache,
QueryCachingPolicy queryCachingPolicy,
boolean wrapWithExitableDirectoryReader,
Executor executor
) throws IOException {
this(
reader,
similarity,
queryCache,
queryCachingPolicy,
new MutableQueryTimeout(),
wrapWithExitableDirectoryReader,
executor,
null
);
}

public ContextIndexSearcher(
IndexReader reader,
Similarity similarity,
Expand Down Expand Up @@ -310,18 +291,22 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
return;
}

cancellable.checkCancelled();
weight = wrapWeight(weight);
// See please https://github.com/apache/lucene/pull/964
collector.setWeight(weight);
final LeafCollector leafCollector;
try {
cancellable.checkCancelled();
weight = wrapWeight(weight);
// See please https://github.com/apache/lucene/pull/964
collector.setWeight(weight);
leafCollector = collector.getLeafCollector(ctx);
} catch (CollectionTerminatedException e) {
// there is no doc of interest in this reader context
// continue with the following leaf
return;
} catch (QueryPhase.TimeExceededException e) {
searchContext.setSearchTimedOut(true);
return;
}
// catch early terminated exception and rethrow?
Bits liveDocs = ctx.reader().getLiveDocs();
BitSet liveDocsBitSet = getSparseBitSetOrNull(liveDocs);
if (liveDocsBitSet == null) {
Expand All @@ -332,6 +317,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
} catch (QueryPhase.TimeExceededException e) {
searchContext.setSearchTimedOut(true);
return;
}
}
} else {
Expand All @@ -348,6 +336,9 @@ private void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collecto
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
} catch (QueryPhase.TimeExceededException e) {
searchContext.setSearchTimedOut(true);
return;
}
}
}
Expand Down Expand Up @@ -492,7 +483,7 @@ private boolean canMatch(LeafReaderContext ctx) throws IOException {
}

private boolean canMatchSearchAfter(LeafReaderContext ctx) throws IOException {
if (searchContext != null && searchContext.request() != null && searchContext.request().source() != null) {
if (searchContext.request() != null && searchContext.request().source() != null) {
// Only applied on primary sort field and primary search_after.
FieldSortBuilder primarySortField = FieldSortBuilder.getPrimaryFieldSortOrNull(searchContext.request().source());
if (primarySortField != null) {
Expand All @@ -512,7 +503,7 @@ private boolean shouldReverseLeafReaderContexts() {
// This is actually beneficial for search queries to start search on latest segments first for time series workload.
// That can slow down ASC order queries on timestamp workload. So to avoid that slowdown, we will reverse leaf
// reader order here.
if (searchContext != null && searchContext.indexShard().isTimeSeriesDescSortOptimizationEnabled()) {
if (searchContext.indexShard().isTimeSeriesDescSortOptimizationEnabled()) {
// Only reverse order for asc order sort queries
if (searchContext.sort() != null
&& searchContext.sort().sort != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ public abstract class SearchContext implements Releasable {
private final AtomicBoolean closed = new AtomicBoolean(false);
private InnerHitsContext innerHitsContext;

private volatile boolean searchTimedOut;

protected SearchContext() {}

public abstract void setTask(SearchShardTask task);
Expand All @@ -106,6 +108,14 @@ protected SearchContext() {}

public abstract boolean isCancelled();

public boolean isSearchTimedOut() {
return this.searchTimedOut;
}

public void setSearchTimedOut(boolean searchTimedOut) {
this.searchTimedOut = searchTimedOut;
}

@Override
public final void close() {
if (closed.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.query.ProfileCollectorManager;
import org.opensearch.search.query.QueryPhase.DefaultQueryPhaseSearcher;
import org.opensearch.search.query.QueryPhase.TimeExceededException;

import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.ExecutionException;

import static org.opensearch.search.query.TopDocsCollectorContext.createTopDocsCollectorContext;

Expand Down Expand Up @@ -80,12 +80,12 @@ private static boolean searchWithCollectorManager(
try {
final ReduceableSearchResult result = searcher.search(query, collectorManager);
result.reduce(queryResult);
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
queryResult.terminatedEarly(true);
} catch (TimeExceededException e) {
} catch (RuntimeException re) {
rethrowCauseIfPossible(re, searchContext);
}
if (searchContext.isSearchTimedOut()) {
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
if (searchContext.request().allowPartialSearchResults() == false) {
// Can't rethrow TimeExceededException because not serializable
throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded");
}
queryResult.searchTimedOut(true);
Expand All @@ -101,4 +101,26 @@ private static boolean searchWithCollectorManager(
public AggregationProcessor aggregationProcessor(SearchContext searchContext) {
return aggregationProcessor;
}

private static <T extends Exception> void rethrowCauseIfPossible(RuntimeException re, SearchContext searchContext) throws T {
// Rethrow exception if cause is null
if (re.getCause() == null) {
throw re;
}

// Unwrap the RuntimeException and ExecutionException from Lucene concurrent search method and rethrow
if (re.getCause() instanceof ExecutionException || re.getCause() instanceof InterruptedException) {
Throwable t = re.getCause();
if (t.getCause() != null) {
throw (T) t.getCause();
}
}

// Rethrow any unexpected exception types
throw new QueryPhaseExecutionException(
searchContext.shardTarget(),
"Failed to execute concurrent segment search thread",
re.getCause()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -354,10 +354,10 @@ private static boolean searchWithCollector(
searcher.search(query, queryCollector);
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
queryResult.terminatedEarly(true);
} catch (TimeExceededException e) {
}
if (searchContext.isSearchTimedOut()) {
assert timeoutSet : "TimeExceededException thrown even though timeout wasn't set";
if (searchContext.request().allowPartialSearchResults() == false) {
// Can't rethrow TimeExceededException because not serializable
throw new QueryPhaseExecutionException(searchContext.shardTarget(), "Time exceeded");
}
queryResult.searchTimedOut(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@
import org.apache.lucene.util.automaton.CompiledAutomaton;
import org.apache.lucene.util.automaton.RegExp;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.tasks.TaskCancelledException;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.AfterClass;
Expand All @@ -59,6 +61,8 @@
import java.util.concurrent.atomic.AtomicBoolean;

import static org.hamcrest.Matchers.equalTo;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class SearchCancellationTests extends OpenSearchTestCase {

Expand Down Expand Up @@ -109,7 +113,8 @@ public void testAddingCancellationActions() throws IOException {
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
null
null,
mock(SearchContext.class)
);
NullPointerException npe = expectThrows(NullPointerException.class, () -> searcher.addQueryCancellation(null));
assertEquals("cancellation runnable should not be null", npe.getMessage());
Expand All @@ -123,13 +128,17 @@ public void testAddingCancellationActions() throws IOException {
public void testCancellableCollector() throws IOException {
TotalHitCountCollector collector1 = new TotalHitCountCollector();
Runnable cancellation = () -> { throw new TaskCancelledException("cancelled"); };
SearchContext searchContext = mock(SearchContext.class);
IndexShard indexShard = mock(IndexShard.class);
when(searchContext.indexShard()).thenReturn(indexShard);
ContextIndexSearcher searcher = new ContextIndexSearcher(
reader,
IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
null
null,
searchContext
);

searcher.search(new MatchAllDocsQuery(), collector1);
Expand Down Expand Up @@ -157,7 +166,8 @@ public void testExitableDirectoryReader() throws IOException {
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
null
null,
mock(SearchContext.class)
);
searcher.addQueryCancellation(cancellation);
CompiledAutomaton automaton = new CompiledAutomaton(new RegExp("a.*").toAutomaton());
Expand Down
Loading

0 comments on commit bbb4e51

Please sign in to comment.