From 3e56548b6ed8cb5747e9dd7e7c4a1ab09136f0bf Mon Sep 17 00:00:00 2001 From: kkewwei Date: Sun, 24 Nov 2024 12:03:27 +0800 Subject: [PATCH] Coordinator can return partial results after the timeout when allow_partial_search_results is true Signed-off-by: kkewwei Signed-off-by: kkewwei --- CHANGELOG.md | 1 + .../action/search/MultiSearchRequest.java | 5 ++ .../action/search/SearchRequest.java | 32 ++++++++- .../opensearch/action/search/SearchTask.java | 11 ++- .../action/search/SearchTransportService.java | 19 ++++- .../action/search/RestMultiSearchAction.java | 4 ++ .../rest/action/search/RestSearchAction.java | 1 + .../AbstractSearchAsyncActionTests.java | 71 ++++++++++++++++++- .../search/MultiSearchRequestTests.java | 31 ++++++++ .../action/search/SearchRequestTests.java | 10 +++ .../search/RandomSearchRequestGenerator.java | 3 + 11 files changed, 179 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 70245afda0dd1..8b0d14d0889cb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,6 +21,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Support prefix list for remote repository attributes([#16271](https://github.com/opensearch-project/OpenSearch/pull/16271)) - Add new configuration setting `synonym_analyzer`, to the `synonym` and `synonym_graph` filters, enabling the specification of a custom analyzer for reading the synonym file ([#16488](https://github.com/opensearch-project/OpenSearch/pull/16488)). - Add stats for remote publication failure and move download failure stats to remote methods([#16682](https://github.com/opensearch-project/OpenSearch/pull/16682/)) +- Coordinator can return partial results after the timeout when allow_partial_search_results is true ([#16681](https://github.com/opensearch-project/OpenSearch/pull/16681)). ### Dependencies - Bump `com.google.cloud:google-cloud-core-http` from 2.23.0 to 2.47.0 ([#16504](https://github.com/opensearch-project/OpenSearch/pull/16504)) diff --git a/server/src/main/java/org/opensearch/action/search/MultiSearchRequest.java b/server/src/main/java/org/opensearch/action/search/MultiSearchRequest.java index f16d7d1e7d6a3..8a2fae9779f3b 100644 --- a/server/src/main/java/org/opensearch/action/search/MultiSearchRequest.java +++ b/server/src/main/java/org/opensearch/action/search/MultiSearchRequest.java @@ -281,6 +281,8 @@ public static void readMultiLineFormat( searchRequest.setCancelAfterTimeInterval(nodeTimeValue(value, null)); } else if ("phase_took".equals(entry.getKey())) { searchRequest.setPhaseTook(nodeBooleanValue(value)); + } else if ("coordinator_timeout".equals(entry.getKey())) { + searchRequest.setCoordinatorTimeout(nodeTimeValue(value)); } else { throw new IllegalArgumentException("key [" + entry.getKey() + "] is not supported in the metadata section"); } @@ -385,6 +387,9 @@ public static void writeSearchRequestParams(SearchRequest request, XContentBuild if (request.isPhaseTook() != null) { xContentBuilder.field("phase_took", request.isPhaseTook()); } + if (request.getCoordinatorTimeout() != null) { + xContentBuilder.field("coordinator_timeout", request.getCoordinatorTimeout().getStringRep()); + } xContentBuilder.endObject(); } diff --git a/server/src/main/java/org/opensearch/action/search/SearchRequest.java b/server/src/main/java/org/opensearch/action/search/SearchRequest.java index 4d3bb868b779a..436cf10091eb4 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchRequest.java +++ b/server/src/main/java/org/opensearch/action/search/SearchRequest.java @@ -59,6 +59,7 @@ import java.util.Objects; import static org.opensearch.action.ValidateActions.addValidationError; +import static org.opensearch.search.SearchService.NO_TIMEOUT; /** * A request to execute search against one or more indices (or all). Best created using @@ -123,6 +124,9 @@ public class SearchRequest extends ActionRequest implements IndicesRequest.Repla private Boolean phaseTook = null; + // it's only been used in coordinator, so we don't need to serialize/deserialize it + private TimeValue coordinatorTimeout = null; + public SearchRequest() { this.localClusterAlias = null; this.absoluteStartMillis = DEFAULT_ABSOLUTE_START_MILLIS; @@ -228,6 +232,7 @@ private SearchRequest( this.finalReduce = finalReduce; this.cancelAfterTimeInterval = searchRequest.cancelAfterTimeInterval; this.phaseTook = searchRequest.phaseTook; + this.coordinatorTimeout = searchRequest.coordinatorTimeout; } /** @@ -275,6 +280,7 @@ public SearchRequest(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_2_12_0)) { phaseTook = in.readOptionalBoolean(); } + coordinatorTimeout = null; } @Override @@ -341,6 +347,13 @@ public ActionRequestValidationException validate() { if (source.aggregations() != null) { validationException = source.aggregations().validate(validationException); } + if (source.timeout() != null && coordinatorTimeout != null && source.timeout().compareTo(coordinatorTimeout) < 0) { + validationException = addValidationError( + "timeout [" + source.timeout() + "] cannot be smaller than coordinator timeout [" + coordinatorTimeout + "]", + validationException + ); + + } } if (pointInTimeBuilder() != null) { if (scroll) { @@ -711,9 +724,18 @@ public String pipeline() { return pipeline; } + public void setCoordinatorTimeout(TimeValue coordinatorTimeout) { + assert coordinatorTimeout != NO_TIMEOUT; + this.coordinatorTimeout = coordinatorTimeout; + } + + public TimeValue getCoordinatorTimeout() { + return coordinatorTimeout; + } + @Override public SearchTask createTask(long id, String type, String action, TaskId parentTaskId, Map headers) { - return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers, cancelAfterTimeInterval); + return new SearchTask(id, type, action, this::buildDescription, parentTaskId, headers, cancelAfterTimeInterval, coordinatorTimeout); } public final String buildDescription() { @@ -765,7 +787,8 @@ public boolean equals(Object o) { && ccsMinimizeRoundtrips == that.ccsMinimizeRoundtrips && Objects.equals(cancelAfterTimeInterval, that.cancelAfterTimeInterval) && Objects.equals(pipeline, that.pipeline) - && Objects.equals(phaseTook, that.phaseTook); + && Objects.equals(phaseTook, that.phaseTook) + && Objects.equals(coordinatorTimeout, that.coordinatorTimeout); } @Override @@ -787,7 +810,8 @@ public int hashCode() { absoluteStartMillis, ccsMinimizeRoundtrips, cancelAfterTimeInterval, - phaseTook + phaseTook, + coordinatorTimeout ); } @@ -832,6 +856,8 @@ public String toString() { + pipeline + ", phaseTook=" + phaseTook + + ", coordinatorTimeout=" + + coordinatorTimeout + "}"; } } diff --git a/server/src/main/java/org/opensearch/action/search/SearchTask.java b/server/src/main/java/org/opensearch/action/search/SearchTask.java index 2a1a961e7607b..9aa64d97e18a3 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTask.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTask.java @@ -53,6 +53,7 @@ public class SearchTask extends QueryGroupTask implements SearchBackpressureTask // generating description in a lazy way since source can be quite big private final Supplier descriptionSupplier; private SearchProgressListener progressListener = SearchProgressListener.NOOP; + private final TimeValue coordinatorTimeout; public SearchTask( long id, @@ -62,7 +63,7 @@ public SearchTask( TaskId parentTaskId, Map headers ) { - this(id, type, action, descriptionSupplier, parentTaskId, headers, NO_TIMEOUT); + this(id, type, action, descriptionSupplier, parentTaskId, headers, NO_TIMEOUT, null); } public SearchTask( @@ -72,10 +73,12 @@ public SearchTask( Supplier descriptionSupplier, TaskId parentTaskId, Map headers, - TimeValue cancelAfterTimeInterval + TimeValue cancelAfterTimeInterval, + TimeValue coordinatorTimeout ) { super(id, type, action, null, parentTaskId, headers, cancelAfterTimeInterval); this.descriptionSupplier = descriptionSupplier; + this.coordinatorTimeout = coordinatorTimeout; } @Override @@ -106,4 +109,8 @@ public final SearchProgressListener getProgressListener() { public boolean shouldCancelChildrenOnCancellation() { return true; } + + public TimeValue getCoordinatorTimeout() { + return coordinatorTimeout; + } } diff --git a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java index 64c738f633f2e..aa3cdfd99ab1d 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchTransportService.java +++ b/server/src/main/java/org/opensearch/action/search/SearchTransportService.java @@ -39,6 +39,7 @@ import org.opensearch.action.support.IndicesOptions; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.Nullable; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.StreamInput; @@ -172,7 +173,7 @@ public void createPitContext( CREATE_READER_CONTEXT_ACTION_NAME, request, task, - TransportRequestOptions.EMPTY, + getTransportRequestOptions(task.getCoordinatorTimeout()), new ActionListenerResponseHandler<>(actionListener, TransportCreatePitAction.CreateReaderContextResponse::new) ); } @@ -188,7 +189,7 @@ public void sendCanMatch( QUERY_CAN_MATCH_NAME, request, task, - TransportRequestOptions.EMPTY, + getTransportRequestOptions(task.getCoordinatorTimeout()), new ActionListenerResponseHandler<>(listener, SearchService.CanMatchResponse::new) ); } @@ -228,6 +229,7 @@ public void sendExecuteDfs( DFS_ACTION_NAME, request, task, + getTransportRequestOptions(task.getCoordinatorTimeout()), new ConnectionCountingHandler<>(listener, DfsSearchResult::new, clientConnections, connection.getNode().getId()) ); } @@ -249,6 +251,7 @@ public void sendExecuteQuery( QUERY_ACTION_NAME, request, task, + getTransportRequestOptions(task.getCoordinatorTimeout()), new ConnectionCountingHandler<>(handler, reader, clientConnections, connection.getNode().getId()) ); } @@ -264,6 +267,7 @@ public void sendExecuteQuery( QUERY_ID_ACTION_NAME, request, task, + getTransportRequestOptions(task.getCoordinatorTimeout()), new ConnectionCountingHandler<>(listener, QuerySearchResult::new, clientConnections, connection.getNode().getId()) ); } @@ -279,6 +283,7 @@ public void sendExecuteScrollQuery( QUERY_SCROLL_ACTION_NAME, request, task, + getTransportRequestOptions(task.getCoordinatorTimeout()), new ConnectionCountingHandler<>(listener, ScrollQuerySearchResult::new, clientConnections, connection.getNode().getId()) ); } @@ -328,6 +333,7 @@ private void sendExecuteFetch( action, request, task, + getTransportRequestOptions(task.getCoordinatorTimeout()), new ConnectionCountingHandler<>(listener, FetchSearchResult::new, clientConnections, connection.getNode().getId()) ); } @@ -342,10 +348,19 @@ void sendExecuteMultiSearch(final MultiSearchRequest request, SearchTask task, f MultiSearchAction.NAME, request, task, + getTransportRequestOptions(task.getCoordinatorTimeout()), new ConnectionCountingHandler<>(listener, MultiSearchResponse::new, clientConnections, connection.getNode().getId()) ); } + static TransportRequestOptions getTransportRequestOptions(TimeValue coordinatorTimeout) { + if (coordinatorTimeout != null) { + return TransportRequestOptions.builder().withTimeout(coordinatorTimeout).build(); + } else { + return TransportRequestOptions.EMPTY; + } + } + public RemoteClusterService getRemoteClusterService() { return transportService.getRemoteClusterService(); } diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestMultiSearchAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestMultiSearchAction.java index 4b11670450727..a340a5eca1ee7 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestMultiSearchAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestMultiSearchAction.java @@ -157,6 +157,7 @@ public static MultiSearchRequest parseRequest( multiRequest.add(searchRequest); }); List requests = multiRequest.requests(); + final TimeValue coordinatorTimeout = restRequest.paramAsTime("coordinator_timeout", null); final TimeValue cancelAfterTimeInterval = restRequest.paramAsTime("cancel_after_time_interval", null); for (SearchRequest request : requests) { // preserve if it's set on the request @@ -171,6 +172,9 @@ public static MultiSearchRequest parseRequest( if (request.getCancelAfterTimeInterval() == null) { request.setCancelAfterTimeInterval(cancelAfterTimeInterval); } + if (request.getCoordinatorTimeout() == null) { + request.setCoordinatorTimeout(coordinatorTimeout); + } } return multiRequest; } diff --git a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java index 05465e32631fd..d5131a9c869e8 100644 --- a/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java +++ b/server/src/main/java/org/opensearch/rest/action/search/RestSearchAction.java @@ -224,6 +224,7 @@ public static void parseSearchRequest( } searchRequest.setCancelAfterTimeInterval(request.paramAsTime("cancel_after_time_interval", null)); + searchRequest.setCoordinatorTimeout(request.paramAsTime("coordinator_timeout", null)); } /** diff --git a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java index 27336e86e52b0..aa8eced5d27e5 100644 --- a/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/AbstractSearchAsyncActionTests.java @@ -36,11 +36,13 @@ import org.opensearch.action.OriginalIndices; import org.opensearch.action.support.IndicesOptions; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.GroupShardsIterator; import org.opensearch.common.UUIDs; import org.opensearch.common.collect.Tuple; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.set.Sets; @@ -55,6 +57,7 @@ import org.opensearch.index.shard.ShardNotFoundException; import org.opensearch.search.SearchPhaseResult; import org.opensearch.search.SearchShardTarget; +import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.search.internal.AliasFilter; import org.opensearch.search.internal.InternalSearchResponse; import org.opensearch.search.internal.ShardSearchContextId; @@ -65,6 +68,7 @@ import org.opensearch.test.OpenSearchTestCase; import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.ReceiveTimeoutTransportException; import org.opensearch.transport.Transport; import org.junit.After; import org.junit.Before; @@ -89,6 +93,9 @@ import java.util.function.BiFunction; import java.util.stream.IntStream; +import org.mockito.Mockito; + +import static org.opensearch.action.search.SearchTransportService.QUERY_ACTION_NAME; import static org.opensearch.tasks.TaskResourceTrackingService.TASK_RESOURCE_USAGE; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; @@ -138,6 +145,7 @@ private AbstractSearchAsyncAction createAction( false, expected, resourceUsage, + false, new SearchShardIterator(null, null, Collections.emptyList(), null) ); } @@ -151,6 +159,7 @@ private AbstractSearchAsyncAction createAction( final boolean catchExceptionWhenExecutePhaseOnShard, final AtomicLong expected, final TaskResourceUsage resourceUsage, + final boolean blockTheFirstQueryPhase, final SearchShardIterator... shards ) { @@ -179,7 +188,7 @@ private AbstractSearchAsyncAction createAction( .setNodeId(randomAlphaOfLengthBetween(1, 5)) .build(); threadPool.getThreadContext().addResponseHeader(TASK_RESOURCE_USAGE, taskResourceInfo.toString()); - + AtomicBoolean firstShard = new AtomicBoolean(true); return new AbstractSearchAsyncAction( "test", logger, @@ -207,7 +216,17 @@ private AbstractSearchAsyncAction createAction( ) { @Override protected SearchPhase getNextPhase(final SearchPhaseResults results, SearchPhaseContext context) { - return null; + if (blockTheFirstQueryPhase) { + return new SearchPhase("test") { + @Override + public void run() { + listener.onResponse(new SearchResponse(null, null, 0, 0, 0, 0, null, null)); + assertingListener.onPhaseEnd(context, null); + } + }; + } else { + return null; + } } @Override @@ -218,6 +237,16 @@ protected void executePhaseOnShard( ) { if (failExecutePhaseOnShard) { listener.onFailure(new ShardNotFoundException(shardIt.shardId())); + } else if (blockTheFirstQueryPhase && firstShard.compareAndSet(true, false)) { + // Sleep and throw ReceiveTimeoutTransportException to simulate node blocked + try { + Thread.sleep(request.getCoordinatorTimeout().millis()); + } catch (InterruptedException e) {} + DiscoveryNode node = Mockito.mock(DiscoveryNode.class); + Mockito.when(node.getName()).thenReturn("test_nodes"); + listener.onFailure( + new ReceiveTimeoutTransportException(node, QUERY_ACTION_NAME, "request_id [171] timed out after [413ms]") + ); } else { if (catchExceptionWhenExecutePhaseOnShard) { try { @@ -227,6 +256,7 @@ protected void executePhaseOnShard( } } else { listener.onResponse(new QuerySearchResult()); + } } } @@ -587,6 +617,7 @@ public void onFailure(Exception e) { false, new AtomicLong(), new TaskResourceUsage(randomLong(), randomLong()), + false, shards ); action.run(); @@ -635,6 +666,7 @@ public void onFailure(Exception e) { false, new AtomicLong(), new TaskResourceUsage(randomLong(), randomLong()), + false, shards ); action.run(); @@ -688,6 +720,7 @@ public void onFailure(Exception e) { catchExceptionWhenExecutePhaseOnShard, new AtomicLong(), new TaskResourceUsage(randomLong(), randomLong()), + false, shards ); action.run(); @@ -791,6 +824,40 @@ public void testOnPhaseListenersWithDfsType() throws InterruptedException { assertEquals(0, testListener.getPhaseCurrent(searchDfsQueryThenFetchAsyncAction.getSearchPhaseName())); } + public void testExecutePhaseOnShardBlockAndRetrunPartialResult() { + // on shard is blocked in query phase + final Index index = new Index("test", UUID.randomUUID().toString()); + + final SearchShardIterator[] shards = IntStream.range(0, 2 + randomInt(4)) + .mapToObj(i -> new SearchShardIterator(null, new ShardId(index, i), List.of("n1"), null, null, null)) + .toArray(SearchShardIterator[]::new); + + SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); + searchRequest.source(new SearchSourceBuilder()); + long timeoutMills = 500; + searchRequest.setCoordinatorTimeout(new TimeValue(timeoutMills, TimeUnit.MILLISECONDS)); + searchRequest.setMaxConcurrentShardRequests(shards.length); + final AtomicBoolean successed = new AtomicBoolean(false); + long current = System.currentTimeMillis(); + + final ArraySearchPhaseResults queryResult = new ArraySearchPhaseResults<>(shards.length); + AbstractSearchAsyncAction action = createAction(searchRequest, queryResult, new ActionListener<>() { + @Override + public void onResponse(SearchResponse response) { + successed.set(true); + } + + @Override + public void onFailure(Exception e) { + successed.set(false); + } + }, false, false, false, new AtomicLong(), new TaskResourceUsage(randomLong(), randomLong()), true, shards); + action.run(); + long s = System.currentTimeMillis() - current; + assertTrue(s > timeoutMills); + assertTrue(successed.get()); + } + private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAction( List searchRequestOperationsListeners ) { diff --git a/server/src/test/java/org/opensearch/action/search/MultiSearchRequestTests.java b/server/src/test/java/org/opensearch/action/search/MultiSearchRequestTests.java index 2577dfdc20698..57d51a6d4ab30 100644 --- a/server/src/test/java/org/opensearch/action/search/MultiSearchRequestTests.java +++ b/server/src/test/java/org/opensearch/action/search/MultiSearchRequestTests.java @@ -180,6 +180,37 @@ public void testOnlyParentMSearchRequestWithCancelAfterTimeIntervalParameter() t assertEquals(new TimeValue(20, TimeUnit.SECONDS), request.requests().get(0).getCancelAfterTimeInterval()); } + public void tesCoordinatorTimeoutAtParentAndFewChildRequest() throws IOException { + final String requestContent = "{\"index\":\"test\", \"expand_wildcards\" : \"open,closed\", " + + "\"coordinator_timeout\" : \"10s\"}\r\n" + + "{\"query\" : {\"match_all\" :{}}}\r\n {\"search_type\" : \"dfs_query_then_fetch\"}\n" + + "{\"query\" : {\"match_all\" :{}}}\r\n"; + FakeRestRequest restRequest = new FakeRestRequest.Builder(xContentRegistry()).withContent( + new BytesArray(requestContent), + XContentType.JSON + ).withParams(Collections.singletonMap("coordinator_timeout", "20s")).build(); + MultiSearchRequest request = RestMultiSearchAction.parseRequest(restRequest, null, true); + assertThat(request.requests().size(), equalTo(2)); + assertThat(request.requests().get(0).indices()[0], equalTo("test")); + // verifies that child search request parameter value is used for first search request + assertEquals(new TimeValue(10, TimeUnit.SECONDS), request.requests().get(0).getCoordinatorTimeout()); + // verifies that parent msearch parameter value is used for second search request + assertEquals(new TimeValue(20, TimeUnit.SECONDS), request.requests().get(1).getCoordinatorTimeout()); + } + + public void testOnlyParentMSearchRequestWithCoordinatorTimeoutParameter() throws IOException { + final String requestContent = "{\"index\":\"test\", \"expand_wildcards\" : \"open,closed\"}}\r\n" + + "{\"query\" : {\"match_all\" :{}}}\r\n"; + FakeRestRequest restRequest = new FakeRestRequest.Builder(xContentRegistry()).withContent( + new BytesArray(requestContent), + XContentType.JSON + ).withParams(Collections.singletonMap("coordinator_timeout", "20s")).build(); + MultiSearchRequest request = RestMultiSearchAction.parseRequest(restRequest, null, true); + assertThat(request.requests().size(), equalTo(1)); + assertThat(request.requests().get(0).indices()[0], equalTo("test")); + assertEquals(new TimeValue(20, TimeUnit.SECONDS), request.requests().get(0).getCoordinatorTimeout()); + } + public void testDefaultIndicesOptions() throws IOException { final String requestContent = "{\"index\":\"test\", \"expand_wildcards\" : \"open,closed\"}}\r\n" + "{\"query\" : {\"match_all\" :{}}}\r\n"; diff --git a/server/src/test/java/org/opensearch/action/search/SearchRequestTests.java b/server/src/test/java/org/opensearch/action/search/SearchRequestTests.java index acda1445bacbb..7b015fd837064 100644 --- a/server/src/test/java/org/opensearch/action/search/SearchRequestTests.java +++ b/server/src/test/java/org/opensearch/action/search/SearchRequestTests.java @@ -238,6 +238,16 @@ public void testValidate() throws IOException { assertEquals(1, validationErrors.validationErrors().size()); assertEquals("using [point in time] is not allowed in a scroll context", validationErrors.validationErrors().get(0)); } + + { + // timeout must be smaller than coordinator_timeout + SearchRequest searchRequest = createSearchRequest().source(new SearchSourceBuilder().timeout(TimeValue.timeValueMillis(10))); + searchRequest.setCoordinatorTimeout(TimeValue.timeValueMillis(100)); + ActionRequestValidationException validationErrors = searchRequest.validate(); + assertNotNull(validationErrors); + assertEquals(1, validationErrors.validationErrors().size()); + assertEquals("[timeout] must be smaller than [coordinator_timeout] (100ms)", validationErrors.validationErrors().get(0)); + } } public void testCopyConstructor() throws IOException { diff --git a/test/framework/src/main/java/org/opensearch/search/RandomSearchRequestGenerator.java b/test/framework/src/main/java/org/opensearch/search/RandomSearchRequestGenerator.java index 74de1e6d96d93..25ba9e33e32c8 100644 --- a/test/framework/src/main/java/org/opensearch/search/RandomSearchRequestGenerator.java +++ b/test/framework/src/main/java/org/opensearch/search/RandomSearchRequestGenerator.java @@ -134,6 +134,9 @@ public static SearchRequest randomSearchRequest(Supplier ra if (randomBoolean()) { searchRequest.setPhaseTook(randomBoolean()); } + if (randomBoolean()) { + searchRequest.setCoordinatorTimeout(TimeValue.parseTimeValue(randomTimeValue(), null, "coordinator_timeout")); + } return searchRequest; }