diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchCancellationIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchCancellationIT.java index 60f7f64837f5d..0dfa9e1cf9637 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/SearchCancellationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchCancellationIT.java @@ -34,6 +34,7 @@ import org.apache.logging.log4j.LogManager; +import org.junit.After; import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionFuture; import org.opensearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; @@ -59,18 +60,24 @@ import org.opensearch.tasks.TaskCancelledException; import org.opensearch.tasks.TaskInfo; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.transport.TransportException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import static org.opensearch.action.search.TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY; import static org.opensearch.index.query.QueryBuilders.scriptQuery; import static org.opensearch.search.SearchCancellationIT.ScriptedBlockPlugin.SCRIPT_NAME; +import static org.opensearch.search.SearchService.NO_TIMEOUT; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertFailures; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.equalTo; @@ -96,6 +103,11 @@ protected Settings nodeSettings(int nodeOrdinal) { .build(); } + @After + public void cleanup() { + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder().putNull("*")).get(); + } + private void indexTestData() { for (int i = 0; i < 5; i++) { // Make sure we have a few segments @@ -153,15 +165,51 @@ private SearchResponse ensureSearchWasCancelled(ActionFuture sea SearchResponse response = searchResponse.actionGet(); logger.info("Search response {}", response); assertNotEquals("At least one shard should have failed", 0, response.getFailedShards()); + verifyCancellationException(response.getShardFailures()); return response; } catch (SearchPhaseExecutionException ex) { logger.info("All shards failed with", ex); + verifyCancellationException(ex.shardFailures()); return null; } } - public void testCancellationDuringQueryPhase() throws Exception { + private void ensureMSearchWasCancelled(ActionFuture mSearchResponse, + Set expectedFailedChildRequests) { + MultiSearchResponse response = mSearchResponse.actionGet(); + Set actualFailedChildRequests = new HashSet<>(); + for (int i = 0; i < response.getResponses().length; ++i) { + SearchResponse sResponse = response.getResponses()[i].getResponse(); + // check if response is null means all the shard failed for this search request + if (sResponse == null) { + Exception ex = response.getResponses()[i].getFailure(); + assertTrue(ex instanceof SearchPhaseExecutionException); + verifyCancellationException(((SearchPhaseExecutionException)ex).shardFailures()); + actualFailedChildRequests.add(i); + + } else if (sResponse.getShardFailures().length > 0) { + verifyCancellationException(sResponse.getShardFailures()); + actualFailedChildRequests.add(i); + } + } + assertEquals("Actual child request with cancellation failure is different that expected", expectedFailedChildRequests, + actualFailedChildRequests); + } + private void verifyCancellationException(ShardSearchFailure[] failures) { + for (ShardSearchFailure searchFailure : failures) { + // failure may happen while executing the search or while sending shard request for next phase. + // Below assertion is handling both the cases + final Throwable topFailureCause = searchFailure.getCause(); + assertTrue(searchFailure.toString(), topFailureCause instanceof TransportException || + topFailureCause instanceof TaskCancelledException); + if (topFailureCause instanceof TransportException) { + assertTrue(topFailureCause.getCause() instanceof TaskCancelledException); + } + } + } + + public void testCancellationDuringQueryPhase() throws Exception { List plugins = initBlockFactory(); indexTestData(); @@ -178,8 +226,49 @@ public void testCancellationDuringQueryPhase() throws Exception { ensureSearchWasCancelled(searchResponse); } - public void testCancellationDuringFetchPhase() throws Exception { + public void testCancellationDuringQueryPhaseUsingRequestParameter() throws Exception { + List plugins = initBlockFactory(); + indexTestData(); + + TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS); + ActionFuture searchResponse = client().prepareSearch("test") + .setCancelAfterTimeInterval(cancellationTimeout) + .setAllowPartialSearchResults(randomBoolean()) + .setQuery( + scriptQuery(new Script( + ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))) + .execute(); + awaitForBlock(plugins); + // sleep for cancellation timeout to ensure scheduled cancellation task is actually executed + Thread.sleep(cancellationTimeout.getMillis()); + // unblock the search thread + disableBlocks(plugins); + ensureSearchWasCancelled(searchResponse); + } + + public void testCancellationDuringQueryPhaseUsingClusterSetting() throws Exception { + List plugins = initBlockFactory(); + indexTestData(); + + TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS); + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder() + .put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, cancellationTimeout) + .build()).get(); + ActionFuture searchResponse = client().prepareSearch("test") + .setAllowPartialSearchResults(randomBoolean()) + .setQuery( + scriptQuery(new Script( + ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))) + .execute(); + awaitForBlock(plugins); + // sleep for cluster cancellation timeout to ensure scheduled cancellation task is actually executed + Thread.sleep(cancellationTimeout.getMillis()); + // unblock the search thread + disableBlocks(plugins); + ensureSearchWasCancelled(searchResponse); + } + public void testCancellationDuringFetchPhase() throws Exception { List plugins = initBlockFactory(); indexTestData(); @@ -196,8 +285,24 @@ public void testCancellationDuringFetchPhase() throws Exception { ensureSearchWasCancelled(searchResponse); } - public void testCancellationOfScrollSearches() throws Exception { + public void testCancellationDuringFetchPhaseUsingRequestParameter() throws Exception { + List plugins = initBlockFactory(); + indexTestData(); + TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS); + ActionFuture searchResponse = client().prepareSearch("test") + .setCancelAfterTimeInterval(cancellationTimeout) + .addScriptField("test_field", + new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()) + ).execute(); + awaitForBlock(plugins); + // sleep for request cancellation timeout to ensure scheduled cancellation task is actually executed + Thread.sleep(cancellationTimeout.getMillis()); + // unblock the search thread + disableBlocks(plugins); + ensureSearchWasCancelled(searchResponse); + } + public void testCancellationOfScrollSearches() throws Exception { List plugins = initBlockFactory(); indexTestData(); @@ -221,6 +326,29 @@ public void testCancellationOfScrollSearches() throws Exception { } } + public void testCancellationOfFirstScrollSearchRequestUsingRequestParameter() throws Exception { + List plugins = initBlockFactory(); + indexTestData(); + TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS); + ActionFuture searchResponse = client().prepareSearch("test") + .setScroll(TimeValue.timeValueSeconds(10)) + .setCancelAfterTimeInterval(cancellationTimeout) + .setSize(5) + .setQuery( + scriptQuery(new Script( + ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()))) + .execute(); + + awaitForBlock(plugins); + Thread.sleep(cancellationTimeout.getMillis()); + disableBlocks(plugins); + SearchResponse response = ensureSearchWasCancelled(searchResponse); + if (response != null) { + // The response might not have failed on all shards - we need to clean scroll + logger.info("Cleaning scroll with id {}", response.getScrollId()); + client().prepareClearScroll().addScrollId(response.getScrollId()).get(); + } + } public void testCancellationOfScrollSearchesOnFollowupRequests() throws Exception { @@ -266,6 +394,93 @@ public void testCancellationOfScrollSearchesOnFollowupRequests() throws Exceptio client().prepareClearScroll().addScrollId(scrollId).get(); } + public void testNoCancellationOfScrollSearchOnFollowUpRequest() throws Exception { + List plugins = initBlockFactory(); + indexTestData(); + + // Disable block so the first request would pass + disableBlocks(plugins); + TimeValue keepAlive = TimeValue.timeValueSeconds(5); + TimeValue cancellationTimeout = TimeValue.timeValueSeconds(2); + SearchResponse searchResponse = client().prepareSearch("test") + .setScroll(keepAlive) + .setCancelAfterTimeInterval(cancellationTimeout) + .setSize(2) + .setQuery( + scriptQuery(new Script( + ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))) + .get(); + + assertNotNull(searchResponse.getScrollId()); + // since the previous scroll response is received before cancellation timeout, the scheduled task will be cancelled. It will not + // be used for the subsequent scroll request, as request is of SearchScrollRequest type instead of SearchRequest type + // Enable block so the second request would block + for (ScriptedBlockPlugin plugin : plugins) { + plugin.reset(); + plugin.enableBlock(); + } + + String scrollId = searchResponse.getScrollId(); + ActionFuture scrollResponse = client().prepareSearchScroll(searchResponse.getScrollId()) + .setScroll(keepAlive).execute(); + + awaitForBlock(plugins); + // sleep for cancellation timeout to ensure there is no scheduled task for cancellation + Thread.sleep(cancellationTimeout.getMillis()); + disableBlocks(plugins); + + // wait for response and ensure there is no failure + SearchResponse response = scrollResponse.get(); + assertEquals(0, response.getFailedShards()); + scrollId = response.getScrollId(); + client().prepareClearScroll().addScrollId(scrollId).get(); + } + + public void testDisableCancellationAtRequestLevel() throws Exception { + List plugins = initBlockFactory(); + indexTestData(); + TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS); + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder() + .put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, cancellationTimeout) + .build()).get(); + ActionFuture searchResponse = client().prepareSearch("test") + .setAllowPartialSearchResults(randomBoolean()) + .setCancelAfterTimeInterval(NO_TIMEOUT) + .setQuery( + scriptQuery(new Script( + ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))) + .execute(); + awaitForBlock(plugins); + // sleep for cancellation timeout to ensure there is no scheduled task for cancellation + Thread.sleep(cancellationTimeout.getMillis()); + // unblock the search thread + disableBlocks(plugins); + // ensure search was successful since cancellation was disabled at request level + assertEquals(0, searchResponse.get().getFailedShards()); + } + + public void testDisableCancellationAtClusterLevel() throws Exception { + List plugins = initBlockFactory(); + indexTestData(); + TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS); + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder() + .put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, NO_TIMEOUT) + .build()).get(); + ActionFuture searchResponse = client().prepareSearch("test") + .setAllowPartialSearchResults(randomBoolean()) + .setQuery( + scriptQuery(new Script( + ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))) + .execute(); + awaitForBlock(plugins); + // sleep for cancellation timeout to ensure there is no scheduled task for cancellation + Thread.sleep(cancellationTimeout.getMillis()); + // unblock the search thread + disableBlocks(plugins); + // ensure search was successful since cancellation was disabled at request level + assertEquals(0, searchResponse.get().getFailedShards()); + } + public void testCancelMultiSearch() throws Exception { List plugins = initBlockFactory(); indexTestData(); @@ -287,6 +502,70 @@ public void testCancelMultiSearch() throws Exception { } } + public void testMSearchChildRequestCancellationWithClusterLevelTimeout() throws Exception { + List plugins = initBlockFactory(); + indexTestData(); + TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS); + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder() + .put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, cancellationTimeout) + .build()).get(); + ActionFuture mSearchResponse = client().prepareMultiSearch() + .add(client().prepareSearch("test").setAllowPartialSearchResults(randomBoolean()) + .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, + Collections.emptyMap())))) + .add(client().prepareSearch("test").setAllowPartialSearchResults(randomBoolean()).setRequestCache(false) + .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, + Collections.emptyMap())))) + .execute(); + awaitForBlock(plugins); + // sleep for cluster cancellation timeout to ensure scheduled cancellation task is actually executed + Thread.sleep(cancellationTimeout.getMillis()); + // unblock the search thread + disableBlocks(plugins); + // both child requests are expected to fail + final Set expectedFailedRequests = new HashSet<>(); + expectedFailedRequests.add(0); + expectedFailedRequests.add(1); + ensureMSearchWasCancelled(mSearchResponse, expectedFailedRequests); + } + + /** + * Verifies cancellation of sub search request with mix of request level and cluster level timeout parameter + * @throws Exception in case of unexpected errors + */ + public void testMSearchChildReqCancellationWithHybridTimeout() throws Exception { + List plugins = initBlockFactory(); + indexTestData(); + TimeValue reqCancellationTimeout = new TimeValue(2, TimeUnit.SECONDS); + TimeValue clusterCancellationTimeout = new TimeValue(3, TimeUnit.SECONDS); + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder() + .put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, clusterCancellationTimeout) + .build()).get(); + ActionFuture mSearchResponse = client().prepareMultiSearch() + .add(client().prepareSearch("test").setAllowPartialSearchResults(randomBoolean()) + .setCancelAfterTimeInterval(reqCancellationTimeout) + .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, + Collections.emptyMap())))) + .add(client().prepareSearch("test").setAllowPartialSearchResults(randomBoolean()) + .setCancelAfterTimeInterval(NO_TIMEOUT) + .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, + Collections.emptyMap())))) + .add(client().prepareSearch("test").setAllowPartialSearchResults(randomBoolean()).setRequestCache(false) + .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, + Collections.emptyMap())))) + .execute(); + awaitForBlock(plugins); + // sleep for cluster cancellation timeout to ensure scheduled cancellation task is actually executed + Thread.sleep(Math.max(reqCancellationTimeout.getMillis(), clusterCancellationTimeout.getMillis())); + // unblock the search thread + disableBlocks(plugins); + // only first and last child request are expected to fail + final Set expectedFailedRequests = new HashSet<>(); + expectedFailedRequests.add(0); + expectedFailedRequests.add(2); + ensureMSearchWasCancelled(mSearchResponse, expectedFailedRequests); + } + public static class ScriptedBlockPlugin extends MockScriptPlugin { static final String SCRIPT_NAME = "search_block"; diff --git a/server/src/main/java/org/opensearch/action/search/MultiSearchRequest.java b/server/src/main/java/org/opensearch/action/search/MultiSearchRequest.java index 0b217e892585c..7e075cede5688 100644 --- a/server/src/main/java/org/opensearch/action/search/MultiSearchRequest.java +++ b/server/src/main/java/org/opensearch/action/search/MultiSearchRequest.java @@ -66,6 +66,7 @@ import static org.opensearch.common.xcontent.support.XContentMapValues.nodeBooleanValue; import static org.opensearch.common.xcontent.support.XContentMapValues.nodeStringArrayValue; import static org.opensearch.common.xcontent.support.XContentMapValues.nodeStringValue; +import static org.opensearch.common.xcontent.support.XContentMapValues.nodeTimeValue; /** * A multi search API request. @@ -272,6 +273,9 @@ public static void readMultiLineFormat(BytesReference data, allowNoIndices = value; } else if ("ignore_throttled".equals(entry.getKey()) || "ignoreThrottled".equals(entry.getKey())) { ignoreThrottled = value; + } else if ("cancel_after_time_interval".equals(entry.getKey()) || + "cancelAfterTimeInterval".equals(entry.getKey())) { + searchRequest.setCancelAfterTimeInterval(nodeTimeValue(value, null)); } else { throw new IllegalArgumentException("key [" + entry.getKey() + "] is not supported in the metadata section"); } @@ -362,6 +366,9 @@ public static void writeSearchRequestParams(SearchRequest request, XContentBuild if (request.allowPartialSearchResults() != null) { xContentBuilder.field("allow_partial_search_results", request.allowPartialSearchResults()); } + if (request.getCancelAfterTimeInterval() != null) { + xContentBuilder.field("cancel_after_time_interval", request.getCancelAfterTimeInterval().getStringRep()); + } xContentBuilder.endObject(); } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequest.java b/server/src/main/java/org/opensearch/action/search/SearchRequest.java index 465e3c47681bd..e761570b2c56b 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequest.java @@ -33,6 +33,7 @@ package org.opensearch.action.search; import org.opensearch.LegacyESVersion; +import org.opensearch.Version; import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.IndicesRequest; @@ -114,6 +115,8 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS; + private TimeValue cancelAfterTimeInterval; + public SearchRequest() { this.localClusterAlias = null; this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS; @@ -191,6 +194,7 @@ private SearchRequest(SearchRequest searchRequest, String[] indices, String loca this.localClusterAlias = localClusterAlias; this.absoluteStartMillis = absoluteStartMillis; this.finalReduce = finalReduce; + this.cancelAfterTimeInterval = searchRequest.cancelAfterTimeInterval; } /** @@ -237,6 +241,10 @@ public SearchRequest(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(LegacyESVersion.V_7_0_0)) { ccsMinimizeRoundtrips = in.readBoolean(); } + + if (in.getVersion().onOrAfter(Version.V_1_1_0)) { + cancelAfterTimeInterval = in.readOptionalTimeValue(); + } } @Override @@ -271,6 +279,10 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(LegacyESVersion.V_7_0_0)) { out.writeBoolean(ccsMinimizeRoundtrips); } + + if (out.getVersion().onOrAfter(Version.V_1_1_0)) { + out.writeOptionalTimeValue(cancelAfterTimeInterval); + } } @Override @@ -669,9 +681,17 @@ public static int resolveTrackTotalHitsUpTo(Scroll scroll, SearchSourceBuilder s SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO : source.trackTotalHitsUpTo(); } + public void setCancelAfterTimeInterval(TimeValue cancelAfterTimeInterval) { + this.cancelAfterTimeInterval = cancelAfterTimeInterval; + } + + public TimeValue getCancelAfterTimeInterval() { + return cancelAfterTimeInterval; + } + @Override public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers); + return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers, cancelAfterTimeInterval); } public final String buildDescription() { @@ -718,14 +738,15 @@ public boolean equals(Object o) { Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults) && Objects.equals(localClusterAlias, that.localClusterAlias) && absoluteStartMillis == that.absoluteStartMillis && - ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips; + ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips && + Objects.equals(cancelAfterTimeInterval, that.cancelAfterTimeInterval); } @Override public int hashCode() { return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache, scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize, - allowPartialSearchResults, localClusterAlias, absoluteStartMillis, ccsMinimizeRoundtrips); + allowPartialSearchResults, localClusterAlias, absoluteStartMillis, ccsMinimizeRoundtrips, cancelAfterTimeInterval); } @Override @@ -746,6 +767,7 @@ public String toString() { ", localClusterAlias=" + localClusterAlias + ", getOrCreateAbsoluteStartMillis=" + absoluteStartMillis + ", ccsMinimizeRoundtrips=" + ccsMinimizeRoundtrips + - ", source=" + source + '}'; + ", source=" + source + + ", cancelAfterTimeInterval=" + cancelAfterTimeInterval + "}"; } } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java b/server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java index 3bbe8e84627bc..1bc13979fac28 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java @@ -626,4 +626,12 @@ public SearchRequestBuilder setPreFilterShardSize(int preFilterShardSize) { this.request.setPreFilterShardSize(preFilterShardSize); return this; } + + /** + * Request level time interval to control how long search is allowed to execute after which it is cancelled. + */ + public SearchRequestBuilder setCancelAfterTimeInterval(TimeValue cancelAfterTimeInterval) { + this.request.setCancelAfterTimeInterval(cancelAfterTimeInterval); + return this; + } } diff --git a/server/src/main/java/org/opensearch/action/search/SearchTask.java b/server/src/main/java/org/opensearch/action/search/SearchTask.java index 35eaaeb3a2c31..16a1f95051462 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTask.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTask.java @@ -32,12 +32,15 @@ package org.opensearch.action.search; +import org.opensearch.common.unit.TimeValue; import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.TaskId; import java.util.Map; import java.util.function.Supplier; +import static org.opensearch.search.SearchService.NO_TIMEOUT; + /** * Task storing information about a currently running {@link SearchRequest}. */ @@ -46,9 +49,14 @@ public class SearchTask extends CancellableTask { private final Supplier descriptionSupplier; private SearchProgressListener progressListener = SearchProgressListener.NOOP; - public SearchTask(long id, String type, String action, Supplier descriptionSupplier, - TaskId parentTaskId, Map headers) { - super(id, type, action, null, parentTaskId, headers); + public SearchTask(long id, String type, String action, Supplier descriptionSupplier, TaskId parentTaskId, + Map headers) { + this(id, type, action, descriptionSupplier, parentTaskId, headers, NO_TIMEOUT); + } + + public SearchTask(long id, String type, String action, Supplier descriptionSupplier, TaskId parentTaskId, + Map headers, TimeValue cancelAfterTimeInterval) { + super(id, type, action, null, parentTaskId, headers, cancelAfterTimeInterval); this.descriptionSupplier = descriptionSupplier; } diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 6d1dc124780ad..17bfbc0aec190 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -41,6 +41,7 @@ import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; import org.opensearch.action.support.IndicesOptions; +import org.opensearch.action.support.TimeoutTaskCancellationUtility; import org.opensearch.client.Client; import org.opensearch.client.OriginSettingClient; import org.opensearch.client.node.NodeClient; @@ -81,6 +82,7 @@ import org.opensearch.search.internal.SearchContext; import org.opensearch.search.profile.ProfileShardResult; import org.opensearch.search.profile.SearchProfileShardResults; +import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskId; import org.opensearch.threadpool.ThreadPool; @@ -121,6 +123,13 @@ public class TransportSearchAction extends HandledTransportAction SHARD_COUNT_LIMIT_SETTING = Setting.longSetting( "action.search.shard_count.limit", Long.MAX_VALUE, 1L, Property.Dynamic, Property.NodeScope); + // cluster level setting for timeout based search cancellation. If search request level parameter is present then that will take + // precedence over the cluster setting value + public static final String SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY = "search.cancel_after_time_interval"; + public static final Setting SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING = + Setting.timeSetting(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, SearchService.NO_TIMEOUT, Setting.Property.Dynamic, + Setting.Property.NodeScope); + private final NodeClient client; private final ThreadPool threadPool; private final ClusterService clusterService; @@ -239,6 +248,14 @@ long buildTookInMillis() { @Override protected void doExecute(Task task, SearchRequest searchRequest, ActionListener listener) { + // only if task is of type CancellableTask and support cancellation on timeout, treat this request eligible for timeout based + // cancellation. There may be other top level requests like AsyncSearch which is using SearchRequest internally and has it's own + // cancellation mechanism. For such cases, the SearchRequest when created can override the createTask and set the + // cancelAfterTimeInterval to NO_TIMEOUT and bypass this mechanism + if (task instanceof CancellableTask) { + listener = TimeoutTaskCancellationUtility.wrapWithCancellationListener(client, (CancellableTask) task, + clusterService.getClusterSettings(), listener); + } executeRequest(task, searchRequest, this::searchAsyncAction, listener); } diff --git a/server/src/main/java/org/opensearch/action/support/TimeoutTaskCancellationUtility.java b/server/src/main/java/org/opensearch/action/support/TimeoutTaskCancellationUtility.java new file mode 100644 index 0000000000000..a8acc82a7a18e --- /dev/null +++ b/server/src/main/java/org/opensearch/action/support/TimeoutTaskCancellationUtility.java @@ -0,0 +1,135 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.support; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; +import org.opensearch.client.OriginSettingClient; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.search.SearchService; +import org.opensearch.tasks.CancellableTask; +import org.opensearch.tasks.TaskId; +import org.opensearch.threadpool.Scheduler; +import org.opensearch.threadpool.ThreadPool; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.opensearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN; +import static org.opensearch.action.search.TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING; + +public class TimeoutTaskCancellationUtility { + + private static final Logger logger = LogManager.getLogger(TimeoutTaskCancellationUtility.class); + + /** + * Wraps a listener with a timeout listener {@link TimeoutRunnableListener} to schedule the task cancellation for provided tasks on + * generic thread pool + * @param client - {@link NodeClient} + * @param taskToCancel - task to schedule cancellation for + * @param clusterSettings - {@link ClusterSettings} + * @param listener - original listener associated with the task + * @return wrapped listener + */ + public static ActionListener wrapWithCancellationListener(NodeClient client, CancellableTask taskToCancel, + ClusterSettings clusterSettings, ActionListener listener) { + final TimeValue globalTimeout = clusterSettings.get(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING); + final TimeValue timeoutInterval = (taskToCancel.getCancellationTimeout() == null) ? globalTimeout + : taskToCancel.getCancellationTimeout(); + // Note: -1 (or no timeout) will help to turn off cancellation. The combinations will be request level set at -1 or request level + // set to null and cluster level set to -1. + ActionListener listenerToReturn = listener; + if (timeoutInterval.equals(SearchService.NO_TIMEOUT)) { + return listenerToReturn; + } + + try { + final TimeoutRunnableListener wrappedListener = new TimeoutRunnableListener<>(timeoutInterval, listener, () -> { + final CancelTasksRequest cancelTasksRequest = new CancelTasksRequest(); + cancelTasksRequest.setTaskId(new TaskId(client.getLocalNodeId(), taskToCancel.getId())); + cancelTasksRequest.setReason("Cancellation timeout of " + timeoutInterval + " is expired"); + // force the origin to execute the cancellation as a system user + new OriginSettingClient(client, TASKS_ORIGIN).admin().cluster() + .cancelTasks(cancelTasksRequest, ActionListener.wrap(r -> logger.debug( + "Scheduled cancel task with timeout: {} for original task: {} is successfully completed", timeoutInterval, + cancelTasksRequest.getTaskId()), + e -> logger.error(new ParameterizedMessage("Scheduled cancel task with timeout: {} for original task: {} is failed", + timeoutInterval, cancelTasksRequest.getTaskId()), e)) + ); + }); + wrappedListener.cancellable = client.threadPool().schedule(wrappedListener, timeoutInterval, ThreadPool.Names.GENERIC); + listenerToReturn = wrappedListener; + } catch (Exception ex) { + // if there is any exception in scheduling the cancellation task then continue without it + logger.warn("Failed to schedule the cancellation task for original task: {}, will continue without it", taskToCancel.getId()); + } + return listenerToReturn; + } + + /** + * Timeout listener which executes the provided runnable after timeout is expired and if a response/failure is not yet received. + * If either a response/failure is received before timeout then the scheduled task is cancelled and response/failure is sent back to + * the original listener. + */ + private static class TimeoutRunnableListener implements ActionListener, Runnable { + + private static final Logger logger = LogManager.getLogger(TimeoutRunnableListener.class); + + // Runnable to execute after timeout + private final TimeValue timeout; + private final ActionListener originalListener; + private final Runnable timeoutRunnable; + private final AtomicBoolean executeRunnable = new AtomicBoolean(true); + private volatile Scheduler.ScheduledCancellable cancellable; + private final long creationTime; + + TimeoutRunnableListener(TimeValue timeout, ActionListener listener, Runnable runAfterTimeout) { + this.timeout = timeout; + this.originalListener = listener; + this.timeoutRunnable = runAfterTimeout; + this.creationTime = System.nanoTime(); + } + + @Override public void onResponse(Response response) { + checkAndCancel(); + originalListener.onResponse(response); + } + + @Override public void onFailure(Exception e) { + checkAndCancel(); + originalListener.onFailure(e); + } + + @Override public void run() { + try { + if (executeRunnable.compareAndSet(true, false)) { + timeoutRunnable.run(); + } // else do nothing since either response/failure is already sent to client + } catch (Exception ex) { + // ignore the exception + logger.error(new ParameterizedMessage("Ignoring the failure to run the provided runnable after timeout of {} with " + + "exception", timeout), ex); + } + } + + private void checkAndCancel() { + if (executeRunnable.compareAndSet(true, false)) { + logger.debug("Aborting the scheduled cancel task after {}", + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - creationTime)); + // timer has not yet expired so cancel it + cancellable.cancel(); + } + } + } +} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index d6e6232e04dfc..fdd48fe0ee2af 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -345,6 +345,7 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS, ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING, TransportSearchAction.SHARD_COUNT_LIMIT_SETTING, + TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING, RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE, RemoteClusterService.SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE, SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER, diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestMultiSearchAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestMultiSearchAction.java index 869ce4eb3ca26..5ef4e768d0f48 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestMultiSearchAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestMultiSearchAction.java @@ -44,6 +44,7 @@ import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.logging.DeprecationLogger; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContent; import org.opensearch.common.xcontent.XContentParser; import org.opensearch.common.xcontent.XContentType; @@ -158,6 +159,7 @@ public static MultiSearchRequest parseRequest(RestRequest restRequest, multiRequest.add(searchRequest); }); List requests = multiRequest.requests(); + final TimeValue cancelAfterTimeInterval = restRequest.paramAsTime("cancel_after_time_interval", null); for (SearchRequest request : requests) { // preserve if it's set on the request if (preFilterShardSize != null && request.getPreFilterShardSize() == null) { @@ -166,6 +168,11 @@ public static MultiSearchRequest parseRequest(RestRequest restRequest, if (maxConcurrentShardRequests != null) { request.setMaxConcurrentShardRequests(maxConcurrentShardRequests); } + // if cancel_after_time_interval parameter is set at per search request level than that is used otherwise one set at + // multi search request level will be used + if (request.getCancelAfterTimeInterval() == null) { + request.setCancelAfterTimeInterval(cancelAfterTimeInterval); + } } return multiRequest; } diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java index 6e1823340868f..49a598c813fc4 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java @@ -208,6 +208,8 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r searchRequest.setCcsMinimizeRoundtrips( request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips())); } + + searchRequest.setCancelAfterTimeInterval(request.paramAsTime("cancel_after_time_interval", null)); } /** diff --git a/server/src/main/java/org/opensearch/search/dfs/DfsPhase.java b/server/src/main/java/org/opensearch/search/dfs/DfsPhase.java index d7e127646d372..2bdf4719d8066 100644 --- a/server/src/main/java/org/opensearch/search/dfs/DfsPhase.java +++ b/server/src/main/java/org/opensearch/search/dfs/DfsPhase.java @@ -62,7 +62,7 @@ public void execute(SearchContext context) { @Override public TermStatistics termStatistics(Term term, int docFreq, long totalTermFreq) throws IOException { if (context.isCancelled()) { - throw new TaskCancelledException("cancelled"); + throw new TaskCancelledException("cancelled task with reason: " + context.getTask().getReasonCancelled()); } TermStatistics ts = super.termStatistics(term, docFreq, totalTermFreq); if (ts != null) { @@ -74,7 +74,7 @@ public TermStatistics termStatistics(Term term, int docFreq, long totalTermFreq) @Override public CollectionStatistics collectionStatistics(String field) throws IOException { if (context.isCancelled()) { - throw new TaskCancelledException("cancelled"); + throw new TaskCancelledException("cancelled task with reason: " + context.getTask().getReasonCancelled()); } CollectionStatistics cs = super.collectionStatistics(field); if (cs != null) { diff --git a/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java b/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java index 6fab0b1952d67..995afaf650121 100644 --- a/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java +++ b/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java @@ -108,7 +108,7 @@ public void execute(SearchContext context) { } if (context.isCancelled()) { - throw new TaskCancelledException("cancelled"); + throw new TaskCancelledException("cancelled task with reason: " + context.getTask().getReasonCancelled()); } if (context.docIdsToLoadSize() == 0) { @@ -140,7 +140,7 @@ public void execute(SearchContext context) { boolean hasSequentialDocs = hasSequentialDocs(docs); for (int index = 0; index < context.docIdsToLoadSize(); index++) { if (context.isCancelled()) { - throw new TaskCancelledException("cancelled"); + throw new TaskCancelledException("cancelled task with reason: " + context.getTask().getReasonCancelled()); } int docId = docs[index].docId; try { @@ -181,7 +181,7 @@ public void execute(SearchContext context) { } } if (context.isCancelled()) { - throw new TaskCancelledException("cancelled"); + throw new TaskCancelledException("cancelled task with reason: " + context.getTask().getReasonCancelled()); } TotalHits totalHits = context.queryResult().getTotalHits(); diff --git a/server/src/main/java/org/opensearch/search/query/QueryPhase.java b/server/src/main/java/org/opensearch/search/query/QueryPhase.java index c70f3ee0901d6..8c0e944f16da9 100644 --- a/server/src/main/java/org/opensearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/opensearch/search/query/QueryPhase.java @@ -126,7 +126,7 @@ public void preProcess(SearchContext context) { cancellation = context.searcher().addQueryCancellation(() -> { SearchShardTask task = context.getTask(); if (task != null && task.isCancelled()) { - throw new TaskCancelledException("cancelled"); + throw new TaskCancelledException("cancelled task with reason: " + task.getReasonCancelled()); } }); } else { @@ -295,7 +295,7 @@ static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExe searcher.addQueryCancellation(() -> { SearchShardTask task = searchContext.getTask(); if (task != null && task.isCancelled()) { - throw new TaskCancelledException("cancelled"); + throw new TaskCancelledException("cancelled task with reason: " + task.getReasonCancelled()); } }); } diff --git a/server/src/main/java/org/opensearch/tasks/CancellableTask.java b/server/src/main/java/org/opensearch/tasks/CancellableTask.java index 7c5fe4ea0ffa6..ce12c0a0cfcb7 100644 --- a/server/src/main/java/org/opensearch/tasks/CancellableTask.java +++ b/server/src/main/java/org/opensearch/tasks/CancellableTask.java @@ -33,10 +33,13 @@ package org.opensearch.tasks; import org.opensearch.common.Nullable; +import org.opensearch.common.unit.TimeValue; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import static org.opensearch.search.SearchService.NO_TIMEOUT; + /** * A task that can be canceled */ @@ -44,9 +47,16 @@ public abstract class CancellableTask extends Task { private volatile String reason; private final AtomicBoolean cancelled = new AtomicBoolean(false); + private final TimeValue cancelAfterTimeInterval; public CancellableTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { + this(id, type, action, description, parentTaskId, headers, NO_TIMEOUT); + } + + public CancellableTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers, + TimeValue cancelAfterTimeInterval) { super(id, type, action, description, parentTaskId, headers); + this.cancelAfterTimeInterval = cancelAfterTimeInterval; } /** @@ -77,6 +87,10 @@ public boolean isCancelled() { return cancelled.get(); } + public TimeValue getCancellationTimeout() { + return cancelAfterTimeInterval; + } + /** * The reason the task was cancelled or null if it hasn't been cancelled. */ diff --git a/server/src/test/java/org/opensearch/action/search/MultiSearchRequestTests.java b/server/src/test/java/org/opensearch/action/search/MultiSearchRequestTests.java index 588d9a0ce34d5..eb021de7725d1 100644 --- a/server/src/test/java/org/opensearch/action/search/MultiSearchRequestTests.java +++ b/server/src/test/java/org/opensearch/action/search/MultiSearchRequestTests.java @@ -32,6 +32,7 @@ package org.opensearch.action.search; +import org.opensearch.Version; import org.opensearch.action.support.IndicesOptions; import org.opensearch.common.CheckedBiConsumer; import org.opensearch.common.CheckedRunnable; @@ -39,7 +40,9 @@ import org.opensearch.common.Strings; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.logging.DeprecationLogger; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.NamedXContentRegistry; import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.common.xcontent.XContentHelper; @@ -54,6 +57,7 @@ import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.StreamsUtils; +import org.opensearch.test.VersionUtils; import org.opensearch.test.rest.FakeRestRequest; import java.io.IOException; @@ -62,6 +66,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import static java.util.Collections.singletonList; import static org.opensearch.search.RandomSearchRequestGenerator.randomSearchRequest; @@ -136,6 +141,38 @@ public void testSimpleAddWithCarriageReturn() throws Exception { assertThat(request.requests().get(0).types().length, equalTo(0)); } + public void testCancelAfterIntervalAtParentAndFewChildRequest() throws Exception { + final String requestContent = "{\"index\":\"test\", \"expand_wildcards\" : \"open,closed\", " + + "\"cancel_after_time_interval\" : \"10s\"}\r\n" + + "{\"query\" : {\"match_all\" :{}}}\r\n {\"search_type\" : \"dfs_query_then_fetch\"}\n" + + "{\"query\" : {\"match_all\" :{}}}\r\n"; + FakeRestRequest restRequest = new FakeRestRequest.Builder(xContentRegistry()) + .withContent(new BytesArray(requestContent), XContentType.JSON) + .withParams(Collections.singletonMap("cancel_after_time_interval", "20s")) + .build(); + MultiSearchRequest request = RestMultiSearchAction.parseRequest(restRequest, null, true); + assertThat(request.requests().size(), equalTo(2)); + assertThat(request.requests().get(0).indices()[0], equalTo("test")); + // verifies that child search request parameter value is used for first search request + assertEquals(new TimeValue(10, TimeUnit.SECONDS), request.requests().get(0).getCancelAfterTimeInterval()); + // verifies that parent msearch parameter value is used for second search request + assertEquals(request.requests().get(1).searchType(), SearchType.DFS_QUERY_THEN_FETCH); + assertEquals(new TimeValue(20, TimeUnit.SECONDS), request.requests().get(1).getCancelAfterTimeInterval()); + } + + public void testOnlyParentMSearchRequestWithCancelAfterTimeIntervalParameter() throws IOException { + final String requestContent = "{\"index\":\"test\", \"expand_wildcards\" : \"open,closed\"}}\r\n" + + "{\"query\" : {\"match_all\" :{}}}\r\n"; + FakeRestRequest restRequest = new FakeRestRequest.Builder(xContentRegistry()) + .withContent(new BytesArray(requestContent), XContentType.JSON) + .withParams(Collections.singletonMap("cancel_after_time_interval", "20s")) + .build(); + MultiSearchRequest request = RestMultiSearchAction.parseRequest(restRequest, null, true); + assertThat(request.requests().size(), equalTo(1)); + assertThat(request.requests().get(0).indices()[0], equalTo("test")); + assertEquals(new TimeValue(20, TimeUnit.SECONDS), request.requests().get(0).getCancelAfterTimeInterval()); + } + public void testDefaultIndicesOptions() throws IOException { final String requestContent = "{\"index\":\"test\", \"expand_wildcards\" : \"open,closed\"}}\r\n" + "{\"query\" : {\"match_all\" :{}}}\r\n"; @@ -316,6 +353,12 @@ protected NamedXContentRegistry xContentRegistry() { new ParseField(MatchAllQueryBuilder.NAME), (p, c) -> MatchAllQueryBuilder.fromXContent(p)))); } + @Override + protected NamedWriteableRegistry writableRegistry() { + return new NamedWriteableRegistry(singletonList(new NamedWriteableRegistry.Entry(QueryBuilder.class, + MatchAllQueryBuilder.NAME, MatchAllQueryBuilder::new))); + } + public void testMultiLineSerialization() throws IOException { int iters = 16; for (int i = 0; i < iters; i++) { @@ -338,6 +381,24 @@ public void testMultiLineSerialization() throws IOException { } } + public void testSerDeWithCancelAfterTimeIntervalParameterAndRandomVersion() throws IOException { + final String requestContent = "{\"index\":\"test\", \"expand_wildcards\" : \"open,closed\", " + + "\"cancel_after_time_interval\" : \"10s\"}\r\n{\"query\" : {\"match_all\" :{}}}\r\n"; + FakeRestRequest restRequest = new FakeRestRequest.Builder(xContentRegistry()) + .withContent(new BytesArray(requestContent), XContentType.JSON) + .build(); + Version version = VersionUtils.randomVersion(random()); + MultiSearchRequest originalRequest = RestMultiSearchAction.parseRequest(restRequest, null, true); + MultiSearchRequest deserializedRequest = copyWriteable(originalRequest, writableRegistry(), MultiSearchRequest::new, version); + + if (version.before(Version.V_1_1_0)) { + assertNull(deserializedRequest.requests().get(0).getCancelAfterTimeInterval()); + } else { + assertEquals(originalRequest.requests().get(0).getCancelAfterTimeInterval(), + deserializedRequest.requests().get(0).getCancelAfterTimeInterval()); + } + } + public void testWritingExpandWildcards() throws IOException { assertExpandWildcardsValue(IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), true, true, true, randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean()), "all"); diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestTests.java index 24807b2f73b29..29fc6099c4778 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestTests.java @@ -115,6 +115,12 @@ public void testRandomVersionSerialization() throws IOException { assertEquals(searchRequest.getAbsoluteStartMillis(), deserializedRequest.getAbsoluteStartMillis()); assertEquals(searchRequest.isFinalReduce(), deserializedRequest.isFinalReduce()); } + + if (version.onOrAfter(Version.V_1_1_0)) { + assertEquals(searchRequest.getCancelAfterTimeInterval(), deserializedRequest.getCancelAfterTimeInterval()); + } else { + assertNull(deserializedRequest.getCancelAfterTimeInterval()); + } } public void testReadFromPre6_7_0() throws IOException { @@ -261,6 +267,8 @@ private SearchRequest mutate(SearchRequest searchRequest) { () -> randomFrom(SearchType.DFS_QUERY_THEN_FETCH, SearchType.QUERY_THEN_FETCH)))); mutators.add(() -> mutation.source(randomValueOtherThan(searchRequest.source(), this::createSearchSourceBuilder))); mutators.add(() -> mutation.setCcsMinimizeRoundtrips(searchRequest.isCcsMinimizeRoundtrips() == false)); + mutators.add(() -> mutation.setCancelAfterTimeInterval(searchRequest.getCancelAfterTimeInterval() != null + ? null : TimeValue.parseTimeValue(randomTimeValue(), null, "cancel_after_time_interval"))); randomFrom(mutators).run(); return mutation; } diff --git a/test/framework/src/main/java/org/opensearch/search/RandomSearchRequestGenerator.java b/test/framework/src/main/java/org/opensearch/search/RandomSearchRequestGenerator.java index 1ea6f8c58eae7..a873fb04e81b3 100644 --- a/test/framework/src/main/java/org/opensearch/search/RandomSearchRequestGenerator.java +++ b/test/framework/src/main/java/org/opensearch/search/RandomSearchRequestGenerator.java @@ -130,6 +130,10 @@ public static SearchRequest randomSearchRequest(Supplier ra if (randomBoolean()) { searchRequest.source(randomSearchSourceBuilder.get()); } + if (randomBoolean()) { + searchRequest.setCancelAfterTimeInterval( + TimeValue.parseTimeValue(randomTimeValue(), null, "cancel_after_time_interval")); + } return searchRequest; }