diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java index cc2f94e3af9a3..e0d8f7316d1d2 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/basic/SearchRedStateIndexIT.java @@ -20,9 +20,11 @@ package org.elasticsearch.search.basic; +import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.routing.ShardRouting; @@ -36,10 +38,13 @@ import java.util.List; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.lessThanOrEqualTo; @ESIntegTestCase.ClusterScope(minNumDataNodes = 2) public class SearchRedStateIndexIT extends ESIntegTestCase { @@ -52,10 +57,13 @@ public void testAllowPartialsWithRedState() throws Exception { SearchResponse searchResponse = client().prepareSearch().setSize(0).setAllowPartialSearchResults(true) .get(); assertThat(RestStatus.OK, equalTo(searchResponse.status())); - assertThat("Expect no shards failed", searchResponse.getFailedShards(), equalTo(0)); + assertThat("Expect some shards failed", searchResponse.getFailedShards(), allOf(greaterThan(0), lessThanOrEqualTo(numShards))); assertThat("Expect no shards skipped", searchResponse.getSkippedShards(), equalTo(0)); assertThat("Expect subset of shards successful", searchResponse.getSuccessfulShards(), lessThan(numShards)); assertThat("Expected total shards", searchResponse.getTotalShards(), equalTo(numShards)); + for (ShardSearchFailure failure : searchResponse.getShardFailures()) { + assertThat(failure.getCause(), instanceOf(NoShardAvailableActionException.class)); + } } public void testClusterAllowPartialsWithRedState() throws Exception { @@ -66,10 +74,13 @@ public void testClusterAllowPartialsWithRedState() throws Exception { SearchResponse searchResponse = client().prepareSearch().setSize(0).get(); assertThat(RestStatus.OK, equalTo(searchResponse.status())); - assertThat("Expect no shards failed", searchResponse.getFailedShards(), equalTo(0)); + assertThat("Expect some shards failed", searchResponse.getFailedShards(), allOf(greaterThan(0), lessThanOrEqualTo(numShards))); assertThat("Expect no shards skipped", searchResponse.getSkippedShards(), equalTo(0)); assertThat("Expect subset of shards successful", searchResponse.getSuccessfulShards(), lessThan(numShards)); assertThat("Expected total shards", searchResponse.getTotalShards(), equalTo(numShards)); + for (ShardSearchFailure failure : searchResponse.getShardFailures()) { + assertThat(failure.getCause(), instanceOf(NoShardAvailableActionException.class)); + } } diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index 0653d79f31e84..1adc37ac6c1b5 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -32,7 +32,6 @@ import org.elasticsearch.action.support.TransportActions; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.routing.GroupShardsIterator; -import org.elasticsearch.common.Nullable; import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.util.concurrent.AbstractRunnable; @@ -235,7 +234,9 @@ private void performPhaseOnShard(final int shardIndex, final SearchShardIterator * we can continue (cf. InitialSearchPhase#maybeFork). */ if (shard == null) { - fork(() -> onShardFailure(shardIndex, null, shardIt, new NoShardAvailableActionException(shardIt.shardId()))); + SearchShardTarget unassignedShard = new SearchShardTarget(null, shardIt.shardId(), + shardIt.getClusterAlias(), shardIt.getOriginalIndices()); + fork(() -> onShardFailure(shardIndex, unassignedShard, shardIt, new NoShardAvailableActionException(shardIt.shardId()))); } else { final PendingExecutions pendingExecutions = throttleConcurrentRequests ? pendingExecutionsPerNode.computeIfAbsent(shard.getNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode)) @@ -386,14 +387,13 @@ ShardSearchFailure[] buildShardFailures() { return failures; } - private void onShardFailure(final int shardIndex, @Nullable SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) { + private void onShardFailure(final int shardIndex, SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) { // we always add the shard failure for a specific shard instance // we do make sure to clean it on a successful response from a shard onShardFailure(shardIndex, shard, e); final SearchShardTarget nextShard = shardIt.nextOrNull(); final boolean lastShard = nextShard == null; - logger.debug(() -> new ParameterizedMessage("{}: Failed to execute [{}] lastShard [{}]", - shard != null ? shard : shardIt.shardId(), request, lastShard), e); + logger.debug(() -> new ParameterizedMessage("{}: Failed to execute [{}] lastShard [{}]", shard, request, lastShard), e); if (lastShard) { if (request.allowPartialSearchResults() == false) { if (requestCancelled.compareAndSet(false, true)) { @@ -437,10 +437,15 @@ protected void onShardGroupFailure(int shardIndex, SearchShardTarget shardTarget * @param e the failure reason */ @Override - public final void onShardFailure(final int shardIndex, @Nullable SearchShardTarget shardTarget, Exception e) { - // we don't aggregate shard failures on non active shards and failures due to the internal cancellation, + public final void onShardFailure(final int shardIndex, SearchShardTarget shardTarget, Exception e) { + if (TransportActions.isShardNotAvailableException(e)) { + // Groups shard not available exceptions under a generic exception that returns a SERVICE_UNAVAILABLE(503) + // temporary error. + e = new NoShardAvailableActionException(shardTarget.getShardId(), e.getMessage()); + } + // we don't aggregate shard on failures due to the internal cancellation, // but do keep the header counts right - if (TransportActions.isShardNotAvailableException(e) == false && (requestCancelled.get() && isTaskCancelledException(e)) == false) { + if ((requestCancelled.get() && isTaskCancelledException(e)) == false) { AtomicArray shardFailures = this.shardFailures.get(); // lazily create shard failures, so we can early build the empty shard failure list in most cases (no failures) if (shardFailures == null) { // this is double checked locking but it's fine since SetOnce uses a volatile read internally @@ -545,7 +550,11 @@ public final SearchRequest getRequest() { protected final SearchResponse buildSearchResponse(InternalSearchResponse internalSearchResponse, ShardSearchFailure[] failures, String scrollId, String searchContextId) { - return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), successfulOps.get(), + int numSuccess = successfulOps.get(); + int numFailures = failures.length; + assert numSuccess + numFailures == getNumShards() + : "numSuccess(" + numSuccess + ") + numFailures(" + numFailures + ") != totalShards(" + getNumShards() + ")"; + return new SearchResponse(internalSearchResponse, scrollId, getNumShards(), numSuccess, skippedOps.get(), buildTookInMillis(), failures, clusters, searchContextId); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java b/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java index 6cf10742dc303..9a110138a458d 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchResponse.java @@ -204,8 +204,6 @@ public int getSkippedShards() { * The failed number of shards the search was executed on. */ public int getFailedShards() { - // we don't return totalShards - successfulShards, we don't count "no shards available" as a failed shard, just don't - // count it in the successful counter return shardFailures.length; } diff --git a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index 9f1199f774a63..46b1e057c41a2 100644 --- a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -160,33 +160,6 @@ public void testBuildShardSearchTransportRequest() { assertEquals(clusterAlias, shardSearchTransportRequest.getClusterAlias()); } - public void testBuildSearchResponse() { - SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(randomBoolean()); - ArraySearchPhaseResults phaseResults = new ArraySearchPhaseResults<>(10); - AbstractSearchAsyncAction action = createAction(searchRequest, - phaseResults, null, false, new AtomicLong()); - InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty(); - SearchResponse searchResponse = action.buildSearchResponse(internalSearchResponse, action.buildShardFailures(), null, null); - assertSame(searchResponse.getAggregations(), internalSearchResponse.aggregations()); - assertSame(searchResponse.getSuggest(), internalSearchResponse.suggest()); - assertSame(searchResponse.getProfileResults(), internalSearchResponse.profile()); - assertSame(searchResponse.getHits(), internalSearchResponse.hits()); - } - - public void testBuildSearchResponseAllowPartialFailures() { - SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(true); - final ArraySearchPhaseResults queryResult = new ArraySearchPhaseResults<>(10); - AbstractSearchAsyncAction action = createAction(searchRequest, queryResult, null, false, new AtomicLong()); - action.onShardFailure(0, new SearchShardTarget("node", new ShardId("index", "index-uuid", 0), null, OriginalIndices.NONE), - new IllegalArgumentException()); - InternalSearchResponse internalSearchResponse = InternalSearchResponse.empty(); - SearchResponse searchResponse = action.buildSearchResponse(internalSearchResponse, action.buildShardFailures(), null, null); - assertSame(searchResponse.getAggregations(), internalSearchResponse.aggregations()); - assertSame(searchResponse.getSuggest(), internalSearchResponse.suggest()); - assertSame(searchResponse.getProfileResults(), internalSearchResponse.profile()); - assertSame(searchResponse.getHits(), internalSearchResponse.hits()); - } - public void testSendSearchResponseDisallowPartialFailures() { SearchRequest searchRequest = new SearchRequest().allowPartialSearchResults(false); AtomicReference exception = new AtomicReference<>(); diff --git a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java index 6f576567eb15a..5e72f4bda157b 100644 --- a/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java +++ b/x-pack/plugin/async-search/src/internalClusterTest/java/org/elasticsearch/xpack/search/AsyncSearchActionIT.java @@ -447,13 +447,12 @@ public void testRemoveAsyncIndex() throws Exception { ensureTaskRemoval(newResp.getId()); } - public void testSearchPhaseFailureNoCause() throws Exception { + public void testSearchPhaseFailure() throws Exception { SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indexName); request.setKeepOnCompletion(true); request.setWaitForCompletionTimeout(TimeValue.timeValueMinutes(10)); request.getSearchRequest().allowPartialSearchResults(false); request.getSearchRequest() - // AlreadyClosedException are ignored by the coordinating node .source(new SearchSourceBuilder().query(new ThrowingQueryBuilder(randomLong(), new AlreadyClosedException("boom"), 0))); AsyncSearchResponse response = submitAsyncSearch(request); assertFalse(response.isRunning());