From 70eabe5d36bbc171c1551d4d11e71d7df78745ec Mon Sep 17 00:00:00 2001 From: Sorabh Date: Thu, 12 Aug 2021 08:01:28 -0700 Subject: [PATCH] Part 1: Support for cancel_after_timeinterval parameter in search and msearch request (#986) * Part 1: Support for cancel_after_timeinterval parameter in search and msearch request This commit introduces the new request level parameter to configure the timeout interval after which a search request will be cancelled. For msearch request the parameter is supported both at parent request and at sub child search requests. If it is provided at parent level and child search request doesn't have it then the parent level value is set at such child request. The parent level msearch is not used to cancel the parent request as it may be tricky to come up with correct value in cases when child search request can have different runtimes TEST: Added test for ser/de with new parameter Signed-off-by: Sorabh Hamirwasia * Part 2: Support for cancel_after_timeinterval parameter in search and msearch request This commit adds the handling of the new request level parameter and schedule cancellation task. It also adds a cluster setting to set a global cancellation timeout for search request which will be used in absence of request level timeout. TEST: Added new tests in SearchCancellationIT Signed-off-by: Sorabh Hamirwasia * Address Review feedback for Part 1 Signed-off-by: Sorabh Hamirwasia * Address review feedback for Part 2 Signed-off-by: Sorabh Hamirwasia * Update CancellableTask to remove the cancelOnTimeout boolean flag Signed-off-by: Sorabh Hamirwasia * Replace search.cancellation.timeout cluster setting with search.enforce_server.timeout.cancellation to control if cluster level cancel_after_time_interval should take precedence over request level cancel_after_time_interval value Signed-off-by: Sorabh Hamirwasia * Removing the search.enforce_server.timeout.cancellation cluster setting and just keeping search.cancel_after_time_interval setting with request level parameter taking the precedence. Signed-off-by: Sorabh Hamirwasia Co-authored-by: Sorabh Hamirwasia --- .../search/SearchCancellationIT.java | 285 +++++++++++++++++- .../action/search/MultiSearchRequest.java | 7 + .../action/search/SearchRequest.java | 30 +- .../action/search/SearchRequestBuilder.java | 8 + .../opensearch/action/search/SearchTask.java | 14 +- .../action/search/TransportSearchAction.java | 17 ++ .../TimeoutTaskCancellationUtility.java | 135 +++++++++ .../common/settings/ClusterSettings.java | 1 + .../action/search/RestMultiSearchAction.java | 7 + .../rest/action/search/RestSearchAction.java | 2 + .../org/opensearch/search/dfs/DfsPhase.java | 4 +- .../opensearch/search/fetch/FetchPhase.java | 6 +- .../opensearch/search/query/QueryPhase.java | 4 +- .../org/opensearch/tasks/CancellableTask.java | 14 + .../search/MultiSearchRequestTests.java | 61 ++++ .../action/search/SearchRequestTests.java | 8 + .../search/RandomSearchRequestGenerator.java | 4 + 17 files changed, 590 insertions(+), 17 deletions(-) create mode 100644 server/src/main/java/org/opensearch/action/support/TimeoutTaskCancellationUtility.java diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchCancellationIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchCancellationIT.java index 60f7f64837f5d..0dfa9e1cf9637 100644 --- a/server/src/internalClusterTest/java/org/opensearch/search/SearchCancellationIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchCancellationIT.java @@ -34,6 +34,7 @@ import org.apache.logging.log4j.LogManager; +import org.junit.After; import org.opensearch.ExceptionsHelper; import org.opensearch.action.ActionFuture; import org.opensearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse; @@ -59,18 +60,24 @@ import org.opensearch.tasks.TaskCancelledException; import org.opensearch.tasks.TaskInfo; import org.opensearch.test.OpenSearchIntegTestCase; +import org.opensearch.transport.TransportException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; +import static org.opensearch.action.search.TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY; import static org.opensearch.index.query.QueryBuilders.scriptQuery; import static org.opensearch.search.SearchCancellationIT.ScriptedBlockPlugin.SCRIPT_NAME; +import static org.opensearch.search.SearchService.NO_TIMEOUT; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertFailures; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertNoFailures; import static org.hamcrest.Matchers.equalTo; @@ -96,6 +103,11 @@ protected Settings nodeSettings(int nodeOrdinal) { .build(); } + @After + public void cleanup() { + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder().putNull("*")).get(); + } + private void indexTestData() { for (int i = 0; i < 5; i++) { // Make sure we have a few segments @@ -153,15 +165,51 @@ private SearchResponse ensureSearchWasCancelled(ActionFuture sea SearchResponse response = searchResponse.actionGet(); logger.info("Search response {}", response); assertNotEquals("At least one shard should have failed", 0, response.getFailedShards()); + verifyCancellationException(response.getShardFailures()); return response; } catch (SearchPhaseExecutionException ex) { logger.info("All shards failed with", ex); + verifyCancellationException(ex.shardFailures()); return null; } } - public void testCancellationDuringQueryPhase() throws Exception { + private void ensureMSearchWasCancelled(ActionFuture mSearchResponse, + Set expectedFailedChildRequests) { + MultiSearchResponse response = mSearchResponse.actionGet(); + Set actualFailedChildRequests = new HashSet<>(); + for (int i = 0; i < response.getResponses().length; ++i) { + SearchResponse sResponse = response.getResponses()[i].getResponse(); + // check if response is null means all the shard failed for this search request + if (sResponse == null) { + Exception ex = response.getResponses()[i].getFailure(); + assertTrue(ex instanceof SearchPhaseExecutionException); + verifyCancellationException(((SearchPhaseExecutionException)ex).shardFailures()); + actualFailedChildRequests.add(i); + + } else if (sResponse.getShardFailures().length > 0) { + verifyCancellationException(sResponse.getShardFailures()); + actualFailedChildRequests.add(i); + } + } + assertEquals("Actual child request with cancellation failure is different that expected", expectedFailedChildRequests, + actualFailedChildRequests); + } + private void verifyCancellationException(ShardSearchFailure[] failures) { + for (ShardSearchFailure searchFailure : failures) { + // failure may happen while executing the search or while sending shard request for next phase. + // Below assertion is handling both the cases + final Throwable topFailureCause = searchFailure.getCause(); + assertTrue(searchFailure.toString(), topFailureCause instanceof TransportException || + topFailureCause instanceof TaskCancelledException); + if (topFailureCause instanceof TransportException) { + assertTrue(topFailureCause.getCause() instanceof TaskCancelledException); + } + } + } + + public void testCancellationDuringQueryPhase() throws Exception { List plugins = initBlockFactory(); indexTestData(); @@ -178,8 +226,49 @@ public void testCancellationDuringQueryPhase() throws Exception { ensureSearchWasCancelled(searchResponse); } - public void testCancellationDuringFetchPhase() throws Exception { + public void testCancellationDuringQueryPhaseUsingRequestParameter() throws Exception { + List plugins = initBlockFactory(); + indexTestData(); + + TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS); + ActionFuture searchResponse = client().prepareSearch("test") + .setCancelAfterTimeInterval(cancellationTimeout) + .setAllowPartialSearchResults(randomBoolean()) + .setQuery( + scriptQuery(new Script( + ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))) + .execute(); + awaitForBlock(plugins); + // sleep for cancellation timeout to ensure scheduled cancellation task is actually executed + Thread.sleep(cancellationTimeout.getMillis()); + // unblock the search thread + disableBlocks(plugins); + ensureSearchWasCancelled(searchResponse); + } + + public void testCancellationDuringQueryPhaseUsingClusterSetting() throws Exception { + List plugins = initBlockFactory(); + indexTestData(); + + TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS); + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder() + .put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, cancellationTimeout) + .build()).get(); + ActionFuture searchResponse = client().prepareSearch("test") + .setAllowPartialSearchResults(randomBoolean()) + .setQuery( + scriptQuery(new Script( + ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))) + .execute(); + awaitForBlock(plugins); + // sleep for cluster cancellation timeout to ensure scheduled cancellation task is actually executed + Thread.sleep(cancellationTimeout.getMillis()); + // unblock the search thread + disableBlocks(plugins); + ensureSearchWasCancelled(searchResponse); + } + public void testCancellationDuringFetchPhase() throws Exception { List plugins = initBlockFactory(); indexTestData(); @@ -196,8 +285,24 @@ public void testCancellationDuringFetchPhase() throws Exception { ensureSearchWasCancelled(searchResponse); } - public void testCancellationOfScrollSearches() throws Exception { + public void testCancellationDuringFetchPhaseUsingRequestParameter() throws Exception { + List plugins = initBlockFactory(); + indexTestData(); + TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS); + ActionFuture searchResponse = client().prepareSearch("test") + .setCancelAfterTimeInterval(cancellationTimeout) + .addScriptField("test_field", + new Script(ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()) + ).execute(); + awaitForBlock(plugins); + // sleep for request cancellation timeout to ensure scheduled cancellation task is actually executed + Thread.sleep(cancellationTimeout.getMillis()); + // unblock the search thread + disableBlocks(plugins); + ensureSearchWasCancelled(searchResponse); + } + public void testCancellationOfScrollSearches() throws Exception { List plugins = initBlockFactory(); indexTestData(); @@ -221,6 +326,29 @@ public void testCancellationOfScrollSearches() throws Exception { } } + public void testCancellationOfFirstScrollSearchRequestUsingRequestParameter() throws Exception { + List plugins = initBlockFactory(); + indexTestData(); + TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS); + ActionFuture searchResponse = client().prepareSearch("test") + .setScroll(TimeValue.timeValueSeconds(10)) + .setCancelAfterTimeInterval(cancellationTimeout) + .setSize(5) + .setQuery( + scriptQuery(new Script( + ScriptType.INLINE, "mockscript", SCRIPT_NAME, Collections.emptyMap()))) + .execute(); + + awaitForBlock(plugins); + Thread.sleep(cancellationTimeout.getMillis()); + disableBlocks(plugins); + SearchResponse response = ensureSearchWasCancelled(searchResponse); + if (response != null) { + // The response might not have failed on all shards - we need to clean scroll + logger.info("Cleaning scroll with id {}", response.getScrollId()); + client().prepareClearScroll().addScrollId(response.getScrollId()).get(); + } + } public void testCancellationOfScrollSearchesOnFollowupRequests() throws Exception { @@ -266,6 +394,93 @@ public void testCancellationOfScrollSearchesOnFollowupRequests() throws Exceptio client().prepareClearScroll().addScrollId(scrollId).get(); } + public void testNoCancellationOfScrollSearchOnFollowUpRequest() throws Exception { + List plugins = initBlockFactory(); + indexTestData(); + + // Disable block so the first request would pass + disableBlocks(plugins); + TimeValue keepAlive = TimeValue.timeValueSeconds(5); + TimeValue cancellationTimeout = TimeValue.timeValueSeconds(2); + SearchResponse searchResponse = client().prepareSearch("test") + .setScroll(keepAlive) + .setCancelAfterTimeInterval(cancellationTimeout) + .setSize(2) + .setQuery( + scriptQuery(new Script( + ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))) + .get(); + + assertNotNull(searchResponse.getScrollId()); + // since the previous scroll response is received before cancellation timeout, the scheduled task will be cancelled. It will not + // be used for the subsequent scroll request, as request is of SearchScrollRequest type instead of SearchRequest type + // Enable block so the second request would block + for (ScriptedBlockPlugin plugin : plugins) { + plugin.reset(); + plugin.enableBlock(); + } + + String scrollId = searchResponse.getScrollId(); + ActionFuture scrollResponse = client().prepareSearchScroll(searchResponse.getScrollId()) + .setScroll(keepAlive).execute(); + + awaitForBlock(plugins); + // sleep for cancellation timeout to ensure there is no scheduled task for cancellation + Thread.sleep(cancellationTimeout.getMillis()); + disableBlocks(plugins); + + // wait for response and ensure there is no failure + SearchResponse response = scrollResponse.get(); + assertEquals(0, response.getFailedShards()); + scrollId = response.getScrollId(); + client().prepareClearScroll().addScrollId(scrollId).get(); + } + + public void testDisableCancellationAtRequestLevel() throws Exception { + List plugins = initBlockFactory(); + indexTestData(); + TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS); + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder() + .put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, cancellationTimeout) + .build()).get(); + ActionFuture searchResponse = client().prepareSearch("test") + .setAllowPartialSearchResults(randomBoolean()) + .setCancelAfterTimeInterval(NO_TIMEOUT) + .setQuery( + scriptQuery(new Script( + ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))) + .execute(); + awaitForBlock(plugins); + // sleep for cancellation timeout to ensure there is no scheduled task for cancellation + Thread.sleep(cancellationTimeout.getMillis()); + // unblock the search thread + disableBlocks(plugins); + // ensure search was successful since cancellation was disabled at request level + assertEquals(0, searchResponse.get().getFailedShards()); + } + + public void testDisableCancellationAtClusterLevel() throws Exception { + List plugins = initBlockFactory(); + indexTestData(); + TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS); + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder() + .put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, NO_TIMEOUT) + .build()).get(); + ActionFuture searchResponse = client().prepareSearch("test") + .setAllowPartialSearchResults(randomBoolean()) + .setQuery( + scriptQuery(new Script( + ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, Collections.emptyMap()))) + .execute(); + awaitForBlock(plugins); + // sleep for cancellation timeout to ensure there is no scheduled task for cancellation + Thread.sleep(cancellationTimeout.getMillis()); + // unblock the search thread + disableBlocks(plugins); + // ensure search was successful since cancellation was disabled at request level + assertEquals(0, searchResponse.get().getFailedShards()); + } + public void testCancelMultiSearch() throws Exception { List plugins = initBlockFactory(); indexTestData(); @@ -287,6 +502,70 @@ public void testCancelMultiSearch() throws Exception { } } + public void testMSearchChildRequestCancellationWithClusterLevelTimeout() throws Exception { + List plugins = initBlockFactory(); + indexTestData(); + TimeValue cancellationTimeout = new TimeValue(2, TimeUnit.SECONDS); + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder() + .put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, cancellationTimeout) + .build()).get(); + ActionFuture mSearchResponse = client().prepareMultiSearch() + .add(client().prepareSearch("test").setAllowPartialSearchResults(randomBoolean()) + .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, + Collections.emptyMap())))) + .add(client().prepareSearch("test").setAllowPartialSearchResults(randomBoolean()).setRequestCache(false) + .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, + Collections.emptyMap())))) + .execute(); + awaitForBlock(plugins); + // sleep for cluster cancellation timeout to ensure scheduled cancellation task is actually executed + Thread.sleep(cancellationTimeout.getMillis()); + // unblock the search thread + disableBlocks(plugins); + // both child requests are expected to fail + final Set expectedFailedRequests = new HashSet<>(); + expectedFailedRequests.add(0); + expectedFailedRequests.add(1); + ensureMSearchWasCancelled(mSearchResponse, expectedFailedRequests); + } + + /** + * Verifies cancellation of sub search request with mix of request level and cluster level timeout parameter + * @throws Exception in case of unexpected errors + */ + public void testMSearchChildReqCancellationWithHybridTimeout() throws Exception { + List plugins = initBlockFactory(); + indexTestData(); + TimeValue reqCancellationTimeout = new TimeValue(2, TimeUnit.SECONDS); + TimeValue clusterCancellationTimeout = new TimeValue(3, TimeUnit.SECONDS); + client().admin().cluster().prepareUpdateSettings().setPersistentSettings(Settings.builder() + .put(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, clusterCancellationTimeout) + .build()).get(); + ActionFuture mSearchResponse = client().prepareMultiSearch() + .add(client().prepareSearch("test").setAllowPartialSearchResults(randomBoolean()) + .setCancelAfterTimeInterval(reqCancellationTimeout) + .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, + Collections.emptyMap())))) + .add(client().prepareSearch("test").setAllowPartialSearchResults(randomBoolean()) + .setCancelAfterTimeInterval(NO_TIMEOUT) + .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, + Collections.emptyMap())))) + .add(client().prepareSearch("test").setAllowPartialSearchResults(randomBoolean()).setRequestCache(false) + .setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedBlockPlugin.SCRIPT_NAME, + Collections.emptyMap())))) + .execute(); + awaitForBlock(plugins); + // sleep for cluster cancellation timeout to ensure scheduled cancellation task is actually executed + Thread.sleep(Math.max(reqCancellationTimeout.getMillis(), clusterCancellationTimeout.getMillis())); + // unblock the search thread + disableBlocks(plugins); + // only first and last child request are expected to fail + final Set expectedFailedRequests = new HashSet<>(); + expectedFailedRequests.add(0); + expectedFailedRequests.add(2); + ensureMSearchWasCancelled(mSearchResponse, expectedFailedRequests); + } + public static class ScriptedBlockPlugin extends MockScriptPlugin { static final String SCRIPT_NAME = "search_block"; diff --git a/server/src/main/java/org/opensearch/action/search/MultiSearchRequest.java b/server/src/main/java/org/opensearch/action/search/MultiSearchRequest.java index 0b217e892585c..7e075cede5688 100644 --- a/server/src/main/java/org/opensearch/action/search/MultiSearchRequest.java +++ b/server/src/main/java/org/opensearch/action/search/MultiSearchRequest.java @@ -66,6 +66,7 @@ import static org.opensearch.common.xcontent.support.XContentMapValues.nodeBooleanValue; import static org.opensearch.common.xcontent.support.XContentMapValues.nodeStringArrayValue; import static org.opensearch.common.xcontent.support.XContentMapValues.nodeStringValue; +import static org.opensearch.common.xcontent.support.XContentMapValues.nodeTimeValue; /** * A multi search API request. @@ -272,6 +273,9 @@ public static void readMultiLineFormat(BytesReference data, allowNoIndices = value; } else if ("ignore_throttled".equals(entry.getKey()) || "ignoreThrottled".equals(entry.getKey())) { ignoreThrottled = value; + } else if ("cancel_after_time_interval".equals(entry.getKey()) || + "cancelAfterTimeInterval".equals(entry.getKey())) { + searchRequest.setCancelAfterTimeInterval(nodeTimeValue(value, null)); } else { throw new IllegalArgumentException("key [" + entry.getKey() + "] is not supported in the metadata section"); } @@ -362,6 +366,9 @@ public static void writeSearchRequestParams(SearchRequest request, XContentBuild if (request.allowPartialSearchResults() != null) { xContentBuilder.field("allow_partial_search_results", request.allowPartialSearchResults()); } + if (request.getCancelAfterTimeInterval() != null) { + xContentBuilder.field("cancel_after_time_interval", request.getCancelAfterTimeInterval().getStringRep()); + } xContentBuilder.endObject(); } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequest.java b/server/src/main/java/org/opensearch/action/search/SearchRequest.java index 465e3c47681bd..e761570b2c56b 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequest.java @@ -33,6 +33,7 @@ package org.opensearch.action.search; import org.opensearch.LegacyESVersion; +import org.opensearch.Version; import org.opensearch.action.ActionRequest; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.IndicesRequest; @@ -114,6 +115,8 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla private IndicesOptions indicesOptions = DEFAULT_INDICES_OPTIONS; + private TimeValue cancelAfterTimeInterval; + public SearchRequest() { this.localClusterAlias = null; this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS; @@ -191,6 +194,7 @@ private SearchRequest(SearchRequest searchRequest, String[] indices, String loca this.localClusterAlias = localClusterAlias; this.absoluteStartMillis = absoluteStartMillis; this.finalReduce = finalReduce; + this.cancelAfterTimeInterval = searchRequest.cancelAfterTimeInterval; } /** @@ -237,6 +241,10 @@ public SearchRequest(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(LegacyESVersion.V_7_0_0)) { ccsMinimizeRoundtrips = in.readBoolean(); } + + if (in.getVersion().onOrAfter(Version.V_1_1_0)) { + cancelAfterTimeInterval = in.readOptionalTimeValue(); + } } @Override @@ -271,6 +279,10 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(LegacyESVersion.V_7_0_0)) { out.writeBoolean(ccsMinimizeRoundtrips); } + + if (out.getVersion().onOrAfter(Version.V_1_1_0)) { + out.writeOptionalTimeValue(cancelAfterTimeInterval); + } } @Override @@ -669,9 +681,17 @@ public static int resolveTrackTotalHitsUpTo(Scroll scroll, SearchSourceBuilder s SearchContext.DEFAULT_TRACK_TOTAL_HITS_UP_TO : source.trackTotalHitsUpTo(); } + public void setCancelAfterTimeInterval(TimeValue cancelAfterTimeInterval) { + this.cancelAfterTimeInterval = cancelAfterTimeInterval; + } + + public TimeValue getCancelAfterTimeInterval() { + return cancelAfterTimeInterval; + } + @Override public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers); + return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers, cancelAfterTimeInterval); } public final String buildDescription() { @@ -718,14 +738,15 @@ public boolean equals(Object o) { Objects.equals(allowPartialSearchResults, that.allowPartialSearchResults) && Objects.equals(localClusterAlias, that.localClusterAlias) && absoluteStartMillis == that.absoluteStartMillis && - ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips; + ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips && + Objects.equals(cancelAfterTimeInterval, that.cancelAfterTimeInterval); } @Override public int hashCode() { return Objects.hash(searchType, Arrays.hashCode(indices), routing, preference, source, requestCache, scroll, Arrays.hashCode(types), indicesOptions, batchedReduceSize, maxConcurrentShardRequests, preFilterShardSize, - allowPartialSearchResults, localClusterAlias, absoluteStartMillis, ccsMinimizeRoundtrips); + allowPartialSearchResults, localClusterAlias, absoluteStartMillis, ccsMinimizeRoundtrips, cancelAfterTimeInterval); } @Override @@ -746,6 +767,7 @@ public String toString() { ", localClusterAlias=" + localClusterAlias + ", getOrCreateAbsoluteStartMillis=" + absoluteStartMillis + ", ccsMinimizeRoundtrips=" + ccsMinimizeRoundtrips + - ", source=" + source + '}'; + ", source=" + source + + ", cancelAfterTimeInterval=" + cancelAfterTimeInterval + "}"; } } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java b/server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java index 3bbe8e84627bc..1bc13979fac28 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequestBuilder.java @@ -626,4 +626,12 @@ public SearchRequestBuilder setPreFilterShardSize(int preFilterShardSize) { this.request.setPreFilterShardSize(preFilterShardSize); return this; } + + /** + * Request level time interval to control how long search is allowed to execute after which it is cancelled. + */ + public SearchRequestBuilder setCancelAfterTimeInterval(TimeValue cancelAfterTimeInterval) { + this.request.setCancelAfterTimeInterval(cancelAfterTimeInterval); + return this; + } } diff --git a/server/src/main/java/org/opensearch/action/search/SearchTask.java b/server/src/main/java/org/opensearch/action/search/SearchTask.java index 35eaaeb3a2c31..16a1f95051462 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTask.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTask.java @@ -32,12 +32,15 @@ package org.opensearch.action.search; +import org.opensearch.common.unit.TimeValue; import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.TaskId; import java.util.Map; import java.util.function.Supplier; +import static org.opensearch.search.SearchService.NO_TIMEOUT; + /** * Task storing information about a currently running {@link SearchRequest}. */ @@ -46,9 +49,14 @@ public class SearchTask extends CancellableTask { private final Supplier descriptionSupplier; private SearchProgressListener progressListener = SearchProgressListener.NOOP; - public SearchTask(long id, String type, String action, Supplier descriptionSupplier, - TaskId parentTaskId, Map headers) { - super(id, type, action, null, parentTaskId, headers); + public SearchTask(long id, String type, String action, Supplier descriptionSupplier, TaskId parentTaskId, + Map headers) { + this(id, type, action, descriptionSupplier, parentTaskId, headers, NO_TIMEOUT); + } + + public SearchTask(long id, String type, String action, Supplier descriptionSupplier, TaskId parentTaskId, + Map headers, TimeValue cancelAfterTimeInterval) { + super(id, type, action, null, parentTaskId, headers, cancelAfterTimeInterval); this.descriptionSupplier = descriptionSupplier; } diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 6d1dc124780ad..17bfbc0aec190 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -41,6 +41,7 @@ import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; import org.opensearch.action.support.IndicesOptions; +import org.opensearch.action.support.TimeoutTaskCancellationUtility; import org.opensearch.client.Client; import org.opensearch.client.OriginSettingClient; import org.opensearch.client.node.NodeClient; @@ -81,6 +82,7 @@ import org.opensearch.search.internal.SearchContext; import org.opensearch.search.profile.ProfileShardResult; import org.opensearch.search.profile.SearchProfileShardResults; +import org.opensearch.tasks.CancellableTask; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskId; import org.opensearch.threadpool.ThreadPool; @@ -121,6 +123,13 @@ public class TransportSearchAction extends HandledTransportAction SHARD_COUNT_LIMIT_SETTING = Setting.longSetting( "action.search.shard_count.limit", Long.MAX_VALUE, 1L, Property.Dynamic, Property.NodeScope); + // cluster level setting for timeout based search cancellation. If search request level parameter is present then that will take + // precedence over the cluster setting value + public static final String SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY = "search.cancel_after_time_interval"; + public static final Setting SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING = + Setting.timeSetting(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING_KEY, SearchService.NO_TIMEOUT, Setting.Property.Dynamic, + Setting.Property.NodeScope); + private final NodeClient client; private final ThreadPool threadPool; private final ClusterService clusterService; @@ -239,6 +248,14 @@ long buildTookInMillis() { @Override protected void doExecute(Task task, SearchRequest searchRequest, ActionListener listener) { + // only if task is of type CancellableTask and support cancellation on timeout, treat this request eligible for timeout based + // cancellation. There may be other top level requests like AsyncSearch which is using SearchRequest internally and has it's own + // cancellation mechanism. For such cases, the SearchRequest when created can override the createTask and set the + // cancelAfterTimeInterval to NO_TIMEOUT and bypass this mechanism + if (task instanceof CancellableTask) { + listener = TimeoutTaskCancellationUtility.wrapWithCancellationListener(client, (CancellableTask) task, + clusterService.getClusterSettings(), listener); + } executeRequest(task, searchRequest, this::searchAsyncAction, listener); } diff --git a/server/src/main/java/org/opensearch/action/support/TimeoutTaskCancellationUtility.java b/server/src/main/java/org/opensearch/action/support/TimeoutTaskCancellationUtility.java new file mode 100644 index 0000000000000..a8acc82a7a18e --- /dev/null +++ b/server/src/main/java/org/opensearch/action/support/TimeoutTaskCancellationUtility.java @@ -0,0 +1,135 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.support; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest; +import org.opensearch.client.OriginSettingClient; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.search.SearchService; +import org.opensearch.tasks.CancellableTask; +import org.opensearch.tasks.TaskId; +import org.opensearch.threadpool.Scheduler; +import org.opensearch.threadpool.ThreadPool; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.opensearch.action.admin.cluster.node.tasks.get.GetTaskAction.TASKS_ORIGIN; +import static org.opensearch.action.search.TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING; + +public class TimeoutTaskCancellationUtility { + + private static final Logger logger = LogManager.getLogger(TimeoutTaskCancellationUtility.class); + + /** + * Wraps a listener with a timeout listener {@link TimeoutRunnableListener} to schedule the task cancellation for provided tasks on + * generic thread pool + * @param client - {@link NodeClient} + * @param taskToCancel - task to schedule cancellation for + * @param clusterSettings - {@link ClusterSettings} + * @param listener - original listener associated with the task + * @return wrapped listener + */ + public static ActionListener wrapWithCancellationListener(NodeClient client, CancellableTask taskToCancel, + ClusterSettings clusterSettings, ActionListener listener) { + final TimeValue globalTimeout = clusterSettings.get(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING); + final TimeValue timeoutInterval = (taskToCancel.getCancellationTimeout() == null) ? globalTimeout + : taskToCancel.getCancellationTimeout(); + // Note: -1 (or no timeout) will help to turn off cancellation. The combinations will be request level set at -1 or request level + // set to null and cluster level set to -1. + ActionListener listenerToReturn = listener; + if (timeoutInterval.equals(SearchService.NO_TIMEOUT)) { + return listenerToReturn; + } + + try { + final TimeoutRunnableListener wrappedListener = new TimeoutRunnableListener<>(timeoutInterval, listener, () -> { + final CancelTasksRequest cancelTasksRequest = new CancelTasksRequest(); + cancelTasksRequest.setTaskId(new TaskId(client.getLocalNodeId(), taskToCancel.getId())); + cancelTasksRequest.setReason("Cancellation timeout of " + timeoutInterval + " is expired"); + // force the origin to execute the cancellation as a system user + new OriginSettingClient(client, TASKS_ORIGIN).admin().cluster() + .cancelTasks(cancelTasksRequest, ActionListener.wrap(r -> logger.debug( + "Scheduled cancel task with timeout: {} for original task: {} is successfully completed", timeoutInterval, + cancelTasksRequest.getTaskId()), + e -> logger.error(new ParameterizedMessage("Scheduled cancel task with timeout: {} for original task: {} is failed", + timeoutInterval, cancelTasksRequest.getTaskId()), e)) + ); + }); + wrappedListener.cancellable = client.threadPool().schedule(wrappedListener, timeoutInterval, ThreadPool.Names.GENERIC); + listenerToReturn = wrappedListener; + } catch (Exception ex) { + // if there is any exception in scheduling the cancellation task then continue without it + logger.warn("Failed to schedule the cancellation task for original task: {}, will continue without it", taskToCancel.getId()); + } + return listenerToReturn; + } + + /** + * Timeout listener which executes the provided runnable after timeout is expired and if a response/failure is not yet received. + * If either a response/failure is received before timeout then the scheduled task is cancelled and response/failure is sent back to + * the original listener. + */ + private static class TimeoutRunnableListener implements ActionListener, Runnable { + + private static final Logger logger = LogManager.getLogger(TimeoutRunnableListener.class); + + // Runnable to execute after timeout + private final TimeValue timeout; + private final ActionListener originalListener; + private final Runnable timeoutRunnable; + private final AtomicBoolean executeRunnable = new AtomicBoolean(true); + private volatile Scheduler.ScheduledCancellable cancellable; + private final long creationTime; + + TimeoutRunnableListener(TimeValue timeout, ActionListener listener, Runnable runAfterTimeout) { + this.timeout = timeout; + this.originalListener = listener; + this.timeoutRunnable = runAfterTimeout; + this.creationTime = System.nanoTime(); + } + + @Override public void onResponse(Response response) { + checkAndCancel(); + originalListener.onResponse(response); + } + + @Override public void onFailure(Exception e) { + checkAndCancel(); + originalListener.onFailure(e); + } + + @Override public void run() { + try { + if (executeRunnable.compareAndSet(true, false)) { + timeoutRunnable.run(); + } // else do nothing since either response/failure is already sent to client + } catch (Exception ex) { + // ignore the exception + logger.error(new ParameterizedMessage("Ignoring the failure to run the provided runnable after timeout of {} with " + + "exception", timeout), ex); + } + } + + private void checkAndCancel() { + if (executeRunnable.compareAndSet(true, false)) { + logger.debug("Aborting the scheduled cancel task after {}", + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - creationTime)); + // timer has not yet expired so cancel it + cancellable.cancel(); + } + } + } +} diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index d6e6232e04dfc..fdd48fe0ee2af 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -345,6 +345,7 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.DEFAULT_ALLOW_PARTIAL_SEARCH_RESULTS, ElectMasterService.DISCOVERY_ZEN_MINIMUM_MASTER_NODES_SETTING, TransportSearchAction.SHARD_COUNT_LIMIT_SETTING, + TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING, RemoteClusterService.REMOTE_CLUSTER_SKIP_UNAVAILABLE, RemoteClusterService.SEARCH_REMOTE_CLUSTER_SKIP_UNAVAILABLE, SniffConnectionStrategy.REMOTE_CONNECTIONS_PER_CLUSTER, diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestMultiSearchAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestMultiSearchAction.java index 869ce4eb3ca26..5ef4e768d0f48 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestMultiSearchAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestMultiSearchAction.java @@ -44,6 +44,7 @@ import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.logging.DeprecationLogger; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.XContent; import org.opensearch.common.xcontent.XContentParser; import org.opensearch.common.xcontent.XContentType; @@ -158,6 +159,7 @@ public static MultiSearchRequest parseRequest(RestRequest restRequest, multiRequest.add(searchRequest); }); List requests = multiRequest.requests(); + final TimeValue cancelAfterTimeInterval = restRequest.paramAsTime("cancel_after_time_interval", null); for (SearchRequest request : requests) { // preserve if it's set on the request if (preFilterShardSize != null && request.getPreFilterShardSize() == null) { @@ -166,6 +168,11 @@ public static MultiSearchRequest parseRequest(RestRequest restRequest, if (maxConcurrentShardRequests != null) { request.setMaxConcurrentShardRequests(maxConcurrentShardRequests); } + // if cancel_after_time_interval parameter is set at per search request level than that is used otherwise one set at + // multi search request level will be used + if (request.getCancelAfterTimeInterval() == null) { + request.setCancelAfterTimeInterval(cancelAfterTimeInterval); + } } return multiRequest; } diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java index 6e1823340868f..49a598c813fc4 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java @@ -208,6 +208,8 @@ public static void parseSearchRequest(SearchRequest searchRequest, RestRequest r searchRequest.setCcsMinimizeRoundtrips( request.paramAsBoolean("ccs_minimize_roundtrips", searchRequest.isCcsMinimizeRoundtrips())); } + + searchRequest.setCancelAfterTimeInterval(request.paramAsTime("cancel_after_time_interval", null)); } /** diff --git a/server/src/main/java/org/opensearch/search/dfs/DfsPhase.java b/server/src/main/java/org/opensearch/search/dfs/DfsPhase.java index d7e127646d372..2bdf4719d8066 100644 --- a/server/src/main/java/org/opensearch/search/dfs/DfsPhase.java +++ b/server/src/main/java/org/opensearch/search/dfs/DfsPhase.java @@ -62,7 +62,7 @@ public void execute(SearchContext context) { @Override public TermStatistics termStatistics(Term term, int docFreq, long totalTermFreq) throws IOException { if (context.isCancelled()) { - throw new TaskCancelledException("cancelled"); + throw new TaskCancelledException("cancelled task with reason: " + context.getTask().getReasonCancelled()); } TermStatistics ts = super.termStatistics(term, docFreq, totalTermFreq); if (ts != null) { @@ -74,7 +74,7 @@ public TermStatistics termStatistics(Term term, int docFreq, long totalTermFreq) @Override public CollectionStatistics collectionStatistics(String field) throws IOException { if (context.isCancelled()) { - throw new TaskCancelledException("cancelled"); + throw new TaskCancelledException("cancelled task with reason: " + context.getTask().getReasonCancelled()); } CollectionStatistics cs = super.collectionStatistics(field); if (cs != null) { diff --git a/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java b/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java index 6fab0b1952d67..995afaf650121 100644 --- a/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java +++ b/server/src/main/java/org/opensearch/search/fetch/FetchPhase.java @@ -108,7 +108,7 @@ public void execute(SearchContext context) { } if (context.isCancelled()) { - throw new TaskCancelledException("cancelled"); + throw new TaskCancelledException("cancelled task with reason: " + context.getTask().getReasonCancelled()); } if (context.docIdsToLoadSize() == 0) { @@ -140,7 +140,7 @@ public void execute(SearchContext context) { boolean hasSequentialDocs = hasSequentialDocs(docs); for (int index = 0; index < context.docIdsToLoadSize(); index++) { if (context.isCancelled()) { - throw new TaskCancelledException("cancelled"); + throw new TaskCancelledException("cancelled task with reason: " + context.getTask().getReasonCancelled()); } int docId = docs[index].docId; try { @@ -181,7 +181,7 @@ public void execute(SearchContext context) { } } if (context.isCancelled()) { - throw new TaskCancelledException("cancelled"); + throw new TaskCancelledException("cancelled task with reason: " + context.getTask().getReasonCancelled()); } TotalHits totalHits = context.queryResult().getTotalHits(); diff --git a/server/src/main/java/org/opensearch/search/query/QueryPhase.java b/server/src/main/java/org/opensearch/search/query/QueryPhase.java index c70f3ee0901d6..8c0e944f16da9 100644 --- a/server/src/main/java/org/opensearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/opensearch/search/query/QueryPhase.java @@ -126,7 +126,7 @@ public void preProcess(SearchContext context) { cancellation = context.searcher().addQueryCancellation(() -> { SearchShardTask task = context.getTask(); if (task != null && task.isCancelled()) { - throw new TaskCancelledException("cancelled"); + throw new TaskCancelledException("cancelled task with reason: " + task.getReasonCancelled()); } }); } else { @@ -295,7 +295,7 @@ static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExe searcher.addQueryCancellation(() -> { SearchShardTask task = searchContext.getTask(); if (task != null && task.isCancelled()) { - throw new TaskCancelledException("cancelled"); + throw new TaskCancelledException("cancelled task with reason: " + task.getReasonCancelled()); } }); } diff --git a/server/src/main/java/org/opensearch/tasks/CancellableTask.java b/server/src/main/java/org/opensearch/tasks/CancellableTask.java index 7c5fe4ea0ffa6..ce12c0a0cfcb7 100644 --- a/server/src/main/java/org/opensearch/tasks/CancellableTask.java +++ b/server/src/main/java/org/opensearch/tasks/CancellableTask.java @@ -33,10 +33,13 @@ package org.opensearch.tasks; import org.opensearch.common.Nullable; +import org.opensearch.common.unit.TimeValue; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; +import static org.opensearch.search.SearchService.NO_TIMEOUT; + /** * A task that can be canceled */ @@ -44,9 +47,16 @@ public abstract class CancellableTask extends Task { private volatile String reason; private final AtomicBoolean cancelled = new AtomicBoolean(false); + private final TimeValue cancelAfterTimeInterval; public CancellableTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers) { + this(id, type, action, description, parentTaskId, headers, NO_TIMEOUT); + } + + public CancellableTask(long id, String type, String action, String description, TaskId parentTaskId, Map headers, + TimeValue cancelAfterTimeInterval) { super(id, type, action, description, parentTaskId, headers); + this.cancelAfterTimeInterval = cancelAfterTimeInterval; } /** @@ -77,6 +87,10 @@ public boolean isCancelled() { return cancelled.get(); } + public TimeValue getCancellationTimeout() { + return cancelAfterTimeInterval; + } + /** * The reason the task was cancelled or null if it hasn't been cancelled. */ diff --git a/server/src/test/java/org/opensearch/action/search/MultiSearchRequestTests.java b/server/src/test/java/org/opensearch/action/search/MultiSearchRequestTests.java index 588d9a0ce34d5..eb021de7725d1 100644 --- a/server/src/test/java/org/opensearch/action/search/MultiSearchRequestTests.java +++ b/server/src/test/java/org/opensearch/action/search/MultiSearchRequestTests.java @@ -32,6 +32,7 @@ package org.opensearch.action.search; +import org.opensearch.Version; import org.opensearch.action.support.IndicesOptions; import org.opensearch.common.CheckedBiConsumer; import org.opensearch.common.CheckedRunnable; @@ -39,7 +40,9 @@ import org.opensearch.common.Strings; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.logging.DeprecationLogger; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.xcontent.NamedXContentRegistry; import org.opensearch.common.xcontent.XContentBuilder; import org.opensearch.common.xcontent.XContentHelper; @@ -54,6 +57,7 @@ import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.StreamsUtils; +import org.opensearch.test.VersionUtils; import org.opensearch.test.rest.FakeRestRequest; import java.io.IOException; @@ -62,6 +66,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import static java.util.Collections.singletonList; import static org.opensearch.search.RandomSearchRequestGenerator.randomSearchRequest; @@ -136,6 +141,38 @@ public void testSimpleAddWithCarriageReturn() throws Exception { assertThat(request.requests().get(0).types().length, equalTo(0)); } + public void testCancelAfterIntervalAtParentAndFewChildRequest() throws Exception { + final String requestContent = "{\"index\":\"test\", \"expand_wildcards\" : \"open,closed\", " + + "\"cancel_after_time_interval\" : \"10s\"}\r\n" + + "{\"query\" : {\"match_all\" :{}}}\r\n {\"search_type\" : \"dfs_query_then_fetch\"}\n" + + "{\"query\" : {\"match_all\" :{}}}\r\n"; + FakeRestRequest restRequest = new FakeRestRequest.Builder(xContentRegistry()) + .withContent(new BytesArray(requestContent), XContentType.JSON) + .withParams(Collections.singletonMap("cancel_after_time_interval", "20s")) + .build(); + MultiSearchRequest request = RestMultiSearchAction.parseRequest(restRequest, null, true); + assertThat(request.requests().size(), equalTo(2)); + assertThat(request.requests().get(0).indices()[0], equalTo("test")); + // verifies that child search request parameter value is used for first search request + assertEquals(new TimeValue(10, TimeUnit.SECONDS), request.requests().get(0).getCancelAfterTimeInterval()); + // verifies that parent msearch parameter value is used for second search request + assertEquals(request.requests().get(1).searchType(), SearchType.DFS_QUERY_THEN_FETCH); + assertEquals(new TimeValue(20, TimeUnit.SECONDS), request.requests().get(1).getCancelAfterTimeInterval()); + } + + public void testOnlyParentMSearchRequestWithCancelAfterTimeIntervalParameter() throws IOException { + final String requestContent = "{\"index\":\"test\", \"expand_wildcards\" : \"open,closed\"}}\r\n" + + "{\"query\" : {\"match_all\" :{}}}\r\n"; + FakeRestRequest restRequest = new FakeRestRequest.Builder(xContentRegistry()) + .withContent(new BytesArray(requestContent), XContentType.JSON) + .withParams(Collections.singletonMap("cancel_after_time_interval", "20s")) + .build(); + MultiSearchRequest request = RestMultiSearchAction.parseRequest(restRequest, null, true); + assertThat(request.requests().size(), equalTo(1)); + assertThat(request.requests().get(0).indices()[0], equalTo("test")); + assertEquals(new TimeValue(20, TimeUnit.SECONDS), request.requests().get(0).getCancelAfterTimeInterval()); + } + public void testDefaultIndicesOptions() throws IOException { final String requestContent = "{\"index\":\"test\", \"expand_wildcards\" : \"open,closed\"}}\r\n" + "{\"query\" : {\"match_all\" :{}}}\r\n"; @@ -316,6 +353,12 @@ protected NamedXContentRegistry xContentRegistry() { new ParseField(MatchAllQueryBuilder.NAME), (p, c) -> MatchAllQueryBuilder.fromXContent(p)))); } + @Override + protected NamedWriteableRegistry writableRegistry() { + return new NamedWriteableRegistry(singletonList(new NamedWriteableRegistry.Entry(QueryBuilder.class, + MatchAllQueryBuilder.NAME, MatchAllQueryBuilder::new))); + } + public void testMultiLineSerialization() throws IOException { int iters = 16; for (int i = 0; i < iters; i++) { @@ -338,6 +381,24 @@ public void testMultiLineSerialization() throws IOException { } } + public void testSerDeWithCancelAfterTimeIntervalParameterAndRandomVersion() throws IOException { + final String requestContent = "{\"index\":\"test\", \"expand_wildcards\" : \"open,closed\", " + + "\"cancel_after_time_interval\" : \"10s\"}\r\n{\"query\" : {\"match_all\" :{}}}\r\n"; + FakeRestRequest restRequest = new FakeRestRequest.Builder(xContentRegistry()) + .withContent(new BytesArray(requestContent), XContentType.JSON) + .build(); + Version version = VersionUtils.randomVersion(random()); + MultiSearchRequest originalRequest = RestMultiSearchAction.parseRequest(restRequest, null, true); + MultiSearchRequest deserializedRequest = copyWriteable(originalRequest, writableRegistry(), MultiSearchRequest::new, version); + + if (version.before(Version.V_1_1_0)) { + assertNull(deserializedRequest.requests().get(0).getCancelAfterTimeInterval()); + } else { + assertEquals(originalRequest.requests().get(0).getCancelAfterTimeInterval(), + deserializedRequest.requests().get(0).getCancelAfterTimeInterval()); + } + } + public void testWritingExpandWildcards() throws IOException { assertExpandWildcardsValue(IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), true, true, true, randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean()), "all"); diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestTests.java index 24807b2f73b29..29fc6099c4778 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestTests.java @@ -115,6 +115,12 @@ public void testRandomVersionSerialization() throws IOException { assertEquals(searchRequest.getAbsoluteStartMillis(), deserializedRequest.getAbsoluteStartMillis()); assertEquals(searchRequest.isFinalReduce(), deserializedRequest.isFinalReduce()); } + + if (version.onOrAfter(Version.V_1_1_0)) { + assertEquals(searchRequest.getCancelAfterTimeInterval(), deserializedRequest.getCancelAfterTimeInterval()); + } else { + assertNull(deserializedRequest.getCancelAfterTimeInterval()); + } } public void testReadFromPre6_7_0() throws IOException { @@ -261,6 +267,8 @@ private SearchRequest mutate(SearchRequest searchRequest) { () -> randomFrom(SearchType.DFS_QUERY_THEN_FETCH, SearchType.QUERY_THEN_FETCH)))); mutators.add(() -> mutation.source(randomValueOtherThan(searchRequest.source(), this::createSearchSourceBuilder))); mutators.add(() -> mutation.setCcsMinimizeRoundtrips(searchRequest.isCcsMinimizeRoundtrips() == false)); + mutators.add(() -> mutation.setCancelAfterTimeInterval(searchRequest.getCancelAfterTimeInterval() != null + ? null : TimeValue.parseTimeValue(randomTimeValue(), null, "cancel_after_time_interval"))); randomFrom(mutators).run(); return mutation; } diff --git a/test/framework/src/main/java/org/opensearch/search/RandomSearchRequestGenerator.java b/test/framework/src/main/java/org/opensearch/search/RandomSearchRequestGenerator.java index 1ea6f8c58eae7..a873fb04e81b3 100644 --- a/test/framework/src/main/java/org/opensearch/search/RandomSearchRequestGenerator.java +++ b/test/framework/src/main/java/org/opensearch/search/RandomSearchRequestGenerator.java @@ -130,6 +130,10 @@ public static SearchRequest randomSearchRequest(Supplier ra if (randomBoolean()) { searchRequest.source(randomSearchSourceBuilder.get()); } + if (randomBoolean()) { + searchRequest.setCancelAfterTimeInterval( + TimeValue.parseTimeValue(randomTimeValue(), null, "cancel_after_time_interval")); + } return searchRequest; }