From 3ee5c4f3aedb8af47365edfb42ccdb0ea6679348 Mon Sep 17 00:00:00 2001 From: Henning Andersen <33268011+henningandersen@users.noreply.github.com> Date: Mon, 19 Aug 2019 07:57:01 +0200 Subject: [PATCH] Reindex search resiliency (#45497) Local reindex can now survive loosing data nodes that contain source data. The original query will be restarted with a filter for `_seq_no >= last_seq_no` when a failure is detected. The original/first search request is not retried/restarted since this is not what we used to do and it leads to long wait times to get the info back that a search request is bad. Part of #42612 and split out from #43187 --- .../AbstractAsyncBulkByScrollAction.java | 12 +- .../reindex/AsyncDeleteByQueryAction.java | 3 +- .../index/reindex/Reindexer.java | 42 ++- .../reindex/TransportUpdateByQueryAction.java | 2 +- .../remote/RemoteScrollableHitSource.java | 20 +- .../documentation/ReindexDocumentationIT.java | 4 + .../reindex/AsyncBulkByScrollActionTests.java | 7 +- .../ClientScrollableHitSourceTests.java | 265 ++++++++++++++---- .../index/reindex/ReindexBasicTests.java | 5 + .../index/reindex/ReindexFailureTests.java | 12 + .../index/reindex/ReindexMetadataTests.java | 2 +- .../reindex/ReindexResilientSearchIT.java | 151 ++++++++++ .../index/reindex/ReindexScriptTests.java | 2 +- .../index/reindex/ReindexTestCase.java | 7 +- .../RemoteScrollableHitSourceTests.java | 12 +- .../test/reindex/35_search_failures.yml | 24 +- .../reindex/ClientScrollableHitSource.java | 57 +++- .../index/reindex/RetryListener.java | 46 ++- .../index/reindex/ScrollableHitSource.java | 132 ++++++++- .../search/builder/SearchSourceBuilder.java | 7 + 20 files changed, 691 insertions(+), 121 deletions(-) create mode 100644 modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexResilientSearchIT.java diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java index aeca7caf2d446..eef1f19b5039c 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AbstractAsyncBulkByScrollAction.java @@ -119,7 +119,8 @@ public abstract class AbstractAsyncBulkByScrollAction listener, - @Nullable ScriptService scriptService, @Nullable ReindexSslConfig sslConfig) { + @Nullable ScriptService scriptService, @Nullable ReindexSslConfig sslConfig, + @Nullable String restartFromField) { this.task = task; this.scriptService = scriptService; this.sslConfig = sslConfig; @@ -135,7 +136,9 @@ public abstract class AbstractAsyncBulkByScrollAction docs) return bulkRequest; } - protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy) { + protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy, + String restartFromField) { return new ClientScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry, this::onScrollResponse, this::finishHim, client, - mainRequest.getSearchRequest()); + mainRequest.getSearchRequest(), restartFromField); } /** diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java index 7fd18306ddc47..82f88fc25becd 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/AsyncDeleteByQueryAction.java @@ -34,7 +34,8 @@ public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction listener) { - super(task, false, true, logger, client, threadPool, request, listener, scriptService, null); + super(task, false, true, logger, client, threadPool, request, listener, + scriptService, null, null); } @Override diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/Reindexer.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/Reindexer.java index 695b659443d98..cf6ed654d4c07 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/Reindexer.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/Reindexer.java @@ -33,6 +33,7 @@ import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkItemResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.ParentTaskAssigningClient; import org.elasticsearch.client.RestClient; @@ -47,10 +48,14 @@ import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.VersionType; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.VersionFieldMapper; import org.elasticsearch.index.reindex.remote.RemoteScrollableHitSource; import org.elasticsearch.script.Script; import org.elasticsearch.script.ScriptService; +import org.elasticsearch.search.sort.FieldSortBuilder; +import org.elasticsearch.search.sort.SortBuilder; +import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; @@ -92,17 +97,42 @@ public void initTask(BulkByScrollTask task, ReindexRequest request, ActionListen } public void execute(BulkByScrollTask task, ReindexRequest request, ActionListener listener) { + request.getSearchRequest().allowPartialSearchResults(false); + // Notice that this is called both on leader and workers when slicing. + String resumableSortingField = request.getRemoteInfo() == null ? getOrAddRestartFromField(request.getSearchRequest()) : null; + BulkByScrollParallelizationHelper.executeSlicedAction(task, request, ReindexAction.INSTANCE, listener, client, clusterService.localNode(), () -> { ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task); AsyncIndexBySearchAction searchAction = new AsyncIndexBySearchAction(task, logger, assigningClient, threadPool, - scriptService, reindexSslConfig, request, listener); + scriptService, reindexSslConfig, request, resumableSortingField, listener); searchAction.start(); }); } + private static String getOrAddRestartFromField(SearchRequest searchRequest) { + // we keep with the tradition of modifying the input request, though this can lead to strange results (in transport clients). + List> sorts = searchRequest.source().sorts(); + if (sorts != null && sorts.size() >= 1) { + SortBuilder firstSort = sorts.get(0); + if (firstSort instanceof FieldSortBuilder) { + FieldSortBuilder fieldSort = (FieldSortBuilder) firstSort; + if (SeqNoFieldMapper.NAME.equals(fieldSort.getFieldName()) + && fieldSort.order() == SortOrder.ASC) { + return SeqNoFieldMapper.NAME; + } + // todo: support non seq_no fields and descending, but need to check field is numeric and handle missing values too then. + } + return null; + } + + // use unmapped_type to ensure that sorting works when index is newly created without mappings + searchRequest.source().sort(new FieldSortBuilder(SeqNoFieldMapper.NAME).unmappedType("long")); + return SeqNoFieldMapper.NAME; + } + /** * Build the {@link RestClient} used for reindexing from remote clusters. * @param remoteInfo connection information for the remote cluster @@ -170,18 +200,20 @@ static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction listener) { + String restartFromField, ActionListener listener) { super(task, /* * We only need the source version if we're going to use it when write and we only do that when the destination request uses * external versioning. */ request.getDestination().versionType() != VersionType.INTERNAL, - false, logger, client, threadPool, request, listener, scriptService, sslConfig); + SeqNoFieldMapper.NAME.equals(restartFromField), logger, client, threadPool, request, listener, + scriptService, sslConfig, restartFromField); } @Override - protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy) { + protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy, + String restartFromField) { if (mainRequest.getRemoteInfo() != null) { RemoteInfo remoteInfo = mainRequest.getRemoteInfo(); createdThreads = synchronizedList(new ArrayList<>()); @@ -191,7 +223,7 @@ protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffP this::onScrollResponse, this::finishHim, restClient, remoteInfo.getQuery(), mainRequest.getSearchRequest()); } - return super.buildScrollableResultSource(backoffPolicy); + return super.buildScrollableResultSource(backoffPolicy, restartFromField); } @Override diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java index b98b20e390e57..07a6f3e5f9978 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/TransportUpdateByQueryAction.java @@ -87,7 +87,7 @@ static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction onResponse, Consumer fail, RestClient client, BytesReference query, SearchRequest searchRequest) { - super(logger, backoffPolicy, threadPool, countSearchRetry, onResponse, fail); + super(logger, backoffPolicy, threadPool, countSearchRetry, onResponse, fail, null);// todo: handle resume or grace this.query = query; this.searchRequest = searchRequest; this.client = client; } @Override - protected void doStart(RejectAwareActionListener searchListener) { + protected void doStart(TimeValue extraKeepAlive, RejectAwareActionListener searchListener) { lookupRemoteVersion(RejectAwareActionListener.withResponseHandler(searchListener, version -> { remoteVersion = version; execute(RemoteRequestBuilders.initialSearch(searchRequest, query, remoteVersion), @@ -97,12 +97,28 @@ private void onStartResponse(RejectAwareActionListener searchListener, } } + @Override + protected void doRestart(TimeValue extraKeepAlive, long restartFromValue, RejectAwareActionListener searchListener) { + assert false; + throw new UnsupportedOperationException("restart during remote reindex not supported yet"); + } + @Override protected void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, RejectAwareActionListener searchListener) { TimeValue keepAlive = timeValueNanos(searchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos()); execute(RemoteRequestBuilders.scroll(scrollId, keepAlive, remoteVersion), RESPONSE_PARSER, searchListener); } + @Override + protected boolean canRestart() { + return false; + } + + @Override + protected String[] indices() { + return searchRequest.indices(); + } + @Override protected void clearScroll(String scrollId, Runnable onCompletion) { client.performRequestAsync(RemoteRequestBuilders.clearScroll(scrollId, remoteVersion), new ResponseListener() { diff --git a/modules/reindex/src/test/java/org/elasticsearch/client/documentation/ReindexDocumentationIT.java b/modules/reindex/src/test/java/org/elasticsearch/client/documentation/ReindexDocumentationIT.java index c2b907acde44e..6ee412860bb2b 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/client/documentation/ReindexDocumentationIT.java +++ b/modules/reindex/src/test/java/org/elasticsearch/client/documentation/ReindexDocumentationIT.java @@ -86,6 +86,8 @@ public void setup() { @SuppressWarnings("unused") public void testReindex() { Client client = client(); + // todo: this is only necessary to ensure the seqno mapping is created. + client().prepareIndex(INDEX_NAME, "_doc", "1").setSource("data", "x").get(); // tag::reindex1 BulkByScrollResponse response = new ReindexRequestBuilder(client, ReindexAction.INSTANCE) @@ -94,6 +96,8 @@ public void testReindex() { .filter(QueryBuilders.matchQuery("category", "xzy")) // <1> .get(); // end::reindex1 + + client().prepareDelete(INDEX_NAME, "_doc", "1").get(); } @SuppressWarnings("unused") diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java index 0afd68296740e..b033fd403e7b6 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/AsyncBulkByScrollActionTests.java @@ -216,7 +216,7 @@ public void testStartNextScrollRetriesOnRejectionAndSucceeds() throws Exception ClientScrollableHitSource hitSource = new ClientScrollableHitSource(logger, buildTestBackoffPolicy(), threadPool, testTask.getWorkerState()::countSearchRetry, r -> fail(), ExceptionsHelper::reThrowIfNotNull, - new ParentTaskAssigningClient(client, localNode, testTask), testRequest.getSearchRequest()); + new ParentTaskAssigningClient(client, localNode, testTask), testRequest.getSearchRequest(), null); hitSource.setScroll(scrollId()); hitSource.startNextScroll(TimeValue.timeValueSeconds(0)); assertBusy(() -> assertEquals(client.scrollsToReject + 1, client.scrollAttempts.get())); @@ -240,7 +240,7 @@ public void testStartNextScrollRetriesOnRejectionButFailsOnTooManyRejections() t ClientScrollableHitSource hitSource = new ClientScrollableHitSource(logger, buildTestBackoffPolicy(), threadPool, testTask.getWorkerState()::countSearchRetry, r -> fail(), validingOnFail, - new ParentTaskAssigningClient(client, localNode, testTask), testRequest.getSearchRequest()); + new ParentTaskAssigningClient(client, localNode, testTask), testRequest.getSearchRequest(), null); hitSource.setScroll(scrollId()); hitSource.startNextScroll(TimeValue.timeValueSeconds(0)); assertBusy(() -> assertEquals(testRequest.getMaxRetries() + 1, client.scrollAttempts.get())); @@ -733,7 +733,8 @@ private class DummyAsyncBulkByScrollAction extends AbstractAsyncBulkByScrollAction { DummyAsyncBulkByScrollAction() { super(testTask, randomBoolean(), randomBoolean(), AsyncBulkByScrollActionTests.this.logger, - new ParentTaskAssigningClient(client, localNode, testTask), client.threadPool(), testRequest, listener, null, null); + new ParentTaskAssigningClient(client, localNode, testTask), client.threadPool(), testRequest, listener, + null, null, null); } @Override diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ClientScrollableHitSourceTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ClientScrollableHitSourceTests.java index 37425a7c600ef..cdad34cb91304 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ClientScrollableHitSourceTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ClientScrollableHitSourceTests.java @@ -19,17 +19,22 @@ package org.elasticsearch.index.reindex; +import org.apache.logging.log4j.Logger; import org.apache.lucene.search.TotalHits; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; import org.elasticsearch.action.ActionType; import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.search.ClearScrollAction; +import org.elasticsearch.action.search.ClearScrollRequest; +import org.elasticsearch.action.search.ClearScrollResponse; import org.elasticsearch.action.search.SearchAction; import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollAction; import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.client.ParentTaskAssigningClient; import org.elasticsearch.client.support.AbstractClient; import org.elasticsearch.common.bytes.BytesArray; @@ -37,6 +42,10 @@ import org.elasticsearch.common.text.Text; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.elasticsearch.search.internal.InternalSearchResponse; @@ -45,20 +54,25 @@ import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.junit.After; +import org.junit.Assert; import org.junit.Before; +import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; +import java.util.function.LongSupplier; import java.util.stream.IntStream; import static java.util.Collections.emptyMap; import static org.apache.lucene.util.TestUtil.randomSimpleString; import static org.elasticsearch.common.unit.TimeValue.timeValueSeconds; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.instanceOf; public class ClientScrollableHitSourceTests extends ESTestCase { @@ -85,41 +99,40 @@ public void testRetrySuccess() throws InterruptedException { dotestBasicsWithRetry(retries, 0, retries, e -> fail()); } - private static class ExpectedException extends RuntimeException { - ExpectedException(Throwable cause) { - super(cause); - } - } - public void testRetryFail() { int retries = randomInt(10); - ExpectedException ex = expectThrows(ExpectedException.class, () -> { - dotestBasicsWithRetry(retries, retries+1, retries+1, e -> { throw new ExpectedException(e); }); - }); - assertThat(ex.getCause(), instanceOf(EsRejectedExecutionException.class)); + ExpectedException ex = expectThrows(ExpectedException.class, + () -> dotestBasicsWithRetry(retries, retries+1, retries+1, e -> { throw new ExpectedException(e); }) + ); + assertThat(ex.getCause(), anyOf(instanceOf(EsRejectedExecutionException.class), instanceOf(SearchFailureException.class))); } private void dotestBasicsWithRetry(int retries, int minFailures, int maxFailures, Consumer failureHandler) throws InterruptedException { BlockingQueue responses = new ArrayBlockingQueue<>(100); - MockClient client = new MockClient(threadPool); + MockClient client = new MockClient(threadPool, logger); TaskId parentTask = new TaskId("thenode", randomInt()); AtomicInteger actualSearchRetries = new AtomicInteger(); int expectedSearchRetries = 0; ClientScrollableHitSource hitSource = new ClientScrollableHitSource(logger, BackoffPolicy.constantBackoff(TimeValue.ZERO, retries), - threadPool, actualSearchRetries::incrementAndGet, responses::add, failureHandler, + threadPool, actualSearchRetries::incrementAndGet, response -> handleResponse(responses, response), failureHandler, new ParentTaskAssigningClient(client, parentTask), - new SearchRequest().scroll("1m")); + new SearchRequest().scroll("1m"), SeqNoFieldMapper.NAME); + LongSupplier seqNoGenerator = newSeqNoGenerator(0); + long seqNo = Long.MIN_VALUE; hitSource.start(); - for (int retry = 0; retry < randomIntBetween(minFailures, maxFailures); ++retry) { + for (int retry = 0; retry < randomIntBetween(0, maxFailures); ++retry) { client.fail(SearchAction.INSTANCE, new EsRejectedExecutionException()); client.awaitOperation(); ++expectedSearchRetries; + validateSeqNoCondition(client, seqNo); } - SearchResponse searchResponse = createSearchResponse(); + SearchResponse searchResponse = createSearchResponse(seqNoGenerator); client.respond(SearchAction.INSTANCE, searchResponse); + seqNo = extractSeqNoFromSearchResponse(searchResponse, seqNo); + String scrollId = searchResponse.getScrollId(); for (int i = 0; i < randomIntBetween(1, 10); ++i) { ScrollableHitSource.AsyncResponse asyncResponse = responses.poll(10, TimeUnit.SECONDS); @@ -128,46 +141,173 @@ private void dotestBasicsWithRetry(int retries, int minFailures, int maxFailures assertSameHits(asyncResponse.response().getHits(), searchResponse.getHits().getHits()); asyncResponse.done(TimeValue.ZERO); + ActionType expectedAction = SearchScrollAction.INSTANCE; for (int retry = 0; retry < randomIntBetween(minFailures, maxFailures); ++retry) { - client.fail(SearchScrollAction.INSTANCE, new EsRejectedExecutionException()); + if (failSearch(client, expectedAction, seqNo)) { + if (expectedAction == SearchScrollAction.INSTANCE) { + handleClearScrollAction(scrollId, client); + } + expectedAction = SearchAction.INSTANCE; + } client.awaitOperation(); ++expectedSearchRetries; } - searchResponse = createSearchResponse(); - client.respond(SearchScrollAction.INSTANCE, searchResponse); + if (expectedAction == SearchAction.INSTANCE) { + validateSeqNoCondition(client, seqNo); + } + searchResponse = createSearchResponse(seqNoGenerator); + client.respond(expectedAction, searchResponse); + seqNo = extractSeqNoFromSearchResponse(searchResponse, seqNo); + scrollId = searchResponse.getScrollId(); } assertEquals(actualSearchRetries.get(), expectedSearchRetries); } - public void testScrollKeepAlive() { - MockClient client = new MockClient(threadPool); + private void handleResponse(BlockingQueue responses, ScrollableHitSource.AsyncResponse response) { + if (response.response().getFailures().isEmpty() == false) { + throw (RuntimeException) response.response().getFailures().get(0).getReason(); + } + + responses.add(response); + } + + /** + * @return true if search is expected to restart, false if retry scroll. + */ + private boolean failSearch(MockClient client, ActionType expectedAction, long seqNo) { + Exception failure = randomFrom(new EsRejectedExecutionException(), new SearchFailureException()); + if (randomBoolean()) { + client.fail(expectedAction, failure); + return failure instanceof SearchFailureException; + } else { + LongSupplier seqNoGenerator = newSeqNoGenerator(seqNo); + SearchResponse searchResponse = + createSearchResponse(seqNoGenerator, new ShardSearchFailure[] { new ShardSearchFailure(failure) }); + client.respond(expectedAction, searchResponse); + return true; + } + } + + private LongSupplier newSeqNoGenerator(long seqNo) { + return new LongSupplier() { + private long sequence = seqNo; + @Override + public long getAsLong() { + return sequence += randomIntBetween(0, 3); + } + }; + } + + public void testScrollExtraKeepAlive() { + MockClient client = new MockClient(threadPool, logger); TaskId parentTask = new TaskId("thenode", randomInt()); ClientScrollableHitSource hitSource = new ClientScrollableHitSource(logger, BackoffPolicy.constantBackoff(TimeValue.ZERO, 0), - threadPool, () -> fail(), r -> fail(), e -> fail(), new ParentTaskAssigningClient(client, + threadPool, Assert::fail, r -> fail(), e -> fail(), new ParentTaskAssigningClient(client, parentTask), // Set the base for the scroll to wait - this is added to the figure we calculate below - new SearchRequest().scroll(timeValueSeconds(10))); + new SearchRequest().scroll(timeValueSeconds(10)), null); + hitSource.setScroll(generateScrollId()); hitSource.startNextScroll(timeValueSeconds(100)); client.validateRequest(SearchScrollAction.INSTANCE, (SearchScrollRequest r) -> assertEquals(r.scroll().keepAlive().seconds(), 110)); } + public void testRestartExtraKeepAlive() throws InterruptedException { + MockClient client = new MockClient(threadPool, logger); + TaskId parentTask = new TaskId("thenode", randomInt()); + + AtomicInteger retries = new AtomicInteger(); + + ClientScrollableHitSource hitSource = new ClientScrollableHitSource(logger, BackoffPolicy.constantBackoff(TimeValue.ZERO, 1), + threadPool, retries::incrementAndGet, r -> fail(), e -> fail(), new ParentTaskAssigningClient(client, + parentTask), + // Set the base for the scroll to wait - this is added to the figure we calculate below + new SearchRequest().scroll(timeValueSeconds(10)), SeqNoFieldMapper.NAME); + + String scrollId = generateScrollId(); + hitSource.setScroll(scrollId); + hitSource.startNextScroll(timeValueSeconds(100)); + client.fail(SearchScrollAction.INSTANCE, new Exception()); + handleClearScrollAction(scrollId, client); + client.awaitOperation(); + client.validateRequest(SearchAction.INSTANCE, + (SearchRequest r) -> assertEquals(r.scroll().keepAlive().seconds(), 110)); + + assertEquals(1, retries.get()); + } + + private void handleClearScrollAction(String expectedScrollId, MockClient client) throws InterruptedException { + client.awaitOperation(); + client.validateRequest(ClearScrollAction.INSTANCE, + request -> assertEquals(Collections.singletonList(expectedScrollId), request.scrollIds()) + ); + if (randomBoolean()) { + client.respond(ClearScrollAction.INSTANCE, new ClearScrollResponse(true, 1)); + } else { + client.fail(ClearScrollAction.INSTANCE, new Exception()); + } + } + private String generateScrollId() { + return randomSimpleString(random(), 1, 10); + } - private SearchResponse createSearchResponse() { - // create a simulated response. - SearchHit hit = new SearchHit(0, "id", new Text("type"), emptyMap()).sourceRef(new BytesArray("{}")); - SearchHits hits = new SearchHits(IntStream.range(0, randomIntBetween(0, 20)).mapToObj(i -> hit).toArray(SearchHit[]::new), + private SearchResponse createSearchResponse(LongSupplier seqNoGenerator) { + return createSearchResponse(seqNoGenerator, null); + } + + private SearchResponse createSearchResponse(LongSupplier seqNoGenerator, ShardSearchFailure[] shardFailures) { + SearchHits hits = new SearchHits(IntStream.range(0, randomIntBetween(0, 20)).mapToObj(i -> createSearchHit(seqNoGenerator)) + .toArray(SearchHit[]::new), new TotalHits(0, TotalHits.Relation.EQUAL_TO),0); InternalSearchResponse internalResponse = new InternalSearchResponse(hits, null, null, null, false, false, 1); - return new SearchResponse(internalResponse, randomSimpleString(random(), 1, 10), 5, 4, 0, randomLong(), null, + return new SearchResponse(internalResponse, generateScrollId(), 5, 4, 0, randomLong(), shardFailures, SearchResponse.Clusters.EMPTY); } + private SearchHit createSearchHit(LongSupplier seqNoGenerator) { + SearchHit hit = new SearchHit(0, "id", new Text("type"), emptyMap()).sourceRef(new BytesArray("{}")); + hit.setSeqNo(seqNoGenerator.getAsLong()); + return hit; + } + + + private void validateSeqNoCondition(MockClient client, long seqNo) { + client.validateRequest(SearchAction.INSTANCE, request -> validateSeqNoCondition(request, seqNo)); + } + + private void validateSeqNoCondition(SearchRequest request, long seqNo) { + if (request.source() != null && request.source().query() != null) { + QueryBuilder query = request.source().query(); + if (query instanceof BoolQueryBuilder) { + BoolQueryBuilder boolQuery = (BoolQueryBuilder) query; + Optional seqNoFilter = boolQuery.filter().stream() + .filter(q -> q instanceof RangeQueryBuilder && ((RangeQueryBuilder) q).fieldName().equals(SeqNoFieldMapper.NAME)) + .findFirst(); + if (seqNoFilter.isPresent()) + query = seqNoFilter.get(); + } + if (query instanceof RangeQueryBuilder && ((RangeQueryBuilder) query).fieldName().equals(SeqNoFieldMapper.NAME)) { + long requestSeqNo = (long) ((RangeQueryBuilder) query).from(); + assertEquals(requestSeqNo, seqNo); + assertNotEquals(requestSeqNo, Long.MIN_VALUE); + return; + } + } + assertEquals(Long.MIN_VALUE, seqNo); + } + + private long extractSeqNoFromSearchResponse(SearchResponse searchResponse, long seqNo) { + if (searchResponse.getHits().getHits().length != 0) { + seqNo = searchResponse.getHits().getHits()[searchResponse.getHits().getHits().length-1].getSeqNo(); + } + return seqNo; + } + private void assertSameHits(List actual, SearchHit[] expected) { assertEquals(actual.size(), expected.length); for (int i = 0; i < actual.size(); ++i) { @@ -181,58 +321,84 @@ private void assertSameHits(List actual, Sear } } + private static class ExpectedException extends RuntimeException { + ExpectedException(Throwable cause) { + super(cause); + } + } + + private static class SearchFailureException extends RuntimeException { + } + private static class ExecuteRequest { private final ActionType action; private final Request request; private final ActionListener listener; + private final Exception stackTrace; ExecuteRequest(ActionType action, Request request, ActionListener listener) { this.action = action; this.request = request; this.listener = listener; + this.stackTrace = new Exception(); } - public void respond(ActionType action, Function response) { - assertEquals(action, this.action); - listener.onResponse(response.apply(request)); + public void respond(ActionType action, Function responseFunction, Logger logger) { + validateAction(action); + Response response = responseFunction.apply(request); + logger.debug("Responding to request {} {}", action, response); + listener.onResponse(response); } public void fail(ActionType action, Exception response) { - assertEquals(action, this.action); + validateAction(action); listener.onFailure(response); } public void validateRequest(ActionType action, Consumer validator) { - assertEquals(action, this.action); + validateAction(action); validator.accept(request); } + + private void validateAction(ActionType action) { + if (action.equals(this.action) == false) { + throw new AssertionError("Wrong action, actual: " + this.action +", expected: " + action + ", request: " + request, + stackTrace); + } + assertEquals(action, this.action); + } + + @Override + public String toString() { + return "ExecuteRequest{" + + "action=" + action + + ", request=" + request + + '}'; + } } private static class MockClient extends AbstractClient { - private ExecuteRequest executeRequest; + private final Logger logger; + private final BlockingQueue> requests = new ArrayBlockingQueue<>(100); - MockClient(ThreadPool threadPool) { + MockClient(ThreadPool threadPool, Logger logger) { super(Settings.EMPTY, threadPool); + this.logger = logger; } @Override protected synchronized void doExecute(ActionType action, Request request, ActionListener listener) { - - this.executeRequest = new ExecuteRequest<>(action, request, listener); + logger.debug("Registering request {} {}", action, request); + requests.add(new ExecuteRequest<>(action, request, listener)); this.notifyAll(); } @SuppressWarnings("unchecked") public void respondx(ActionType action, Function response) { - ExecuteRequest executeRequest; - synchronized (this) { - executeRequest = this.executeRequest; - this.executeRequest = null; - } - ((ExecuteRequest) executeRequest).respond(action, response); + ((ExecuteRequest) requests.remove()).respond(action, response, logger); } public void respond(ActionType action, @@ -242,18 +408,15 @@ public void respond(ActionType actio @SuppressWarnings("unchecked") public void fail(ActionType action, Exception response) { - ExecuteRequest executeRequest; - synchronized (this) { - executeRequest = this.executeRequest; - this.executeRequest = null; - } - ((ExecuteRequest) executeRequest).fail(action, response); + logger.debug("Failing request {} {}", action, response); + ((ExecuteRequest) requests.remove()).fail(action, response); } @SuppressWarnings("unchecked") public void validateRequest(ActionType action, Consumer validator) { - ((ExecuteRequest) executeRequest).validateRequest(action, validator); + assert requests.peek() != null; + ((ExecuteRequest) requests.peek()).validateRequest(action, validator); } @Override @@ -261,9 +424,9 @@ public void close() { } public synchronized void awaitOperation() throws InterruptedException { - if (executeRequest == null) { + if (requests.isEmpty()) { wait(10000); - assertNotNull("Must receive next request within 10s", executeRequest); + assertFalse("Must receive next request within 10s", requests.isEmpty()); } } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexBasicTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexBasicTests.java index 5b7bc0ff7e159..be110518d1904 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexBasicTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexBasicTests.java @@ -199,4 +199,9 @@ public void testMissingSources() { assertThat(response, matcher().created(0).slices(hasSize(0))); } + public void testEmptyIndex() { + createIndex("source"); + reindex().source("source").destination("dest").get(); + assertEquals(0, client().admin().indices().prepareGetIndex().addIndices("dest*").get().getIndices().length); + } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFailureTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFailureTests.java index 4b9a69cfce1c8..14d793bbf1b54 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFailureTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexFailureTests.java @@ -21,6 +21,9 @@ import org.elasticsearch.action.bulk.BulkItemResponse.Failure; import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.search.sort.SortOrder; import java.util.ArrayList; import java.util.List; @@ -126,6 +129,15 @@ public void testResponseOnSearchFailure() throws Exception { assumeFalse("Wasn't able to trigger a reindex failure in " + attempt + " attempts.", true); } + public void testTimelyResponseOnBadSearchRequest() throws Exception { + indexDocs(2); + ReindexRequestBuilder badReindex = reindex().source("source").destination("dest"); + badReindex.source().addSort("_seq_no", SortOrder.ASC).addSort("nonexisting", SortOrder.ASC); + SearchPhaseExecutionException e = + expectThrows(SearchPhaseExecutionException.class,() -> badReindex.get(TimeValue.timeValueSeconds(30))); + assertThat(e.getCause().getMessage(), containsString("nonexisting")); + } + private void indexDocs(int count) throws Exception { List docs = new ArrayList<>(count); for (int i = 0; i < count; i++) { diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexMetadataTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexMetadataTests.java index 1859acdb74a2d..6243c23ad67a3 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexMetadataTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexMetadataTests.java @@ -77,7 +77,7 @@ protected ReindexRequest request() { private class TestAction extends Reindexer.AsyncIndexBySearchAction { TestAction() { super(ReindexMetadataTests.this.task, ReindexMetadataTests.this.logger, null, ReindexMetadataTests.this.threadPool, - null, null, request(), listener()); + null, null, request(), null, listener()); } public ReindexRequest mainRequest() { diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexResilientSearchIT.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexResilientSearchIT.java new file mode 100644 index 0000000000000..227d1e6429b03 --- /dev/null +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexResilientSearchIT.java @@ -0,0 +1,151 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.reindex; + +import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.PlainListenableActionFuture; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.query.MatchAllQueryBuilder; +import org.elasticsearch.search.aggregations.BucketOrder; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.support.ValueType; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.test.ESIntegTestCase; +import org.hamcrest.Matchers; + +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.test.ESIntegTestCase.Scope.TEST; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; + + +@ESIntegTestCase.ClusterScope(scope = TEST) +public class ReindexResilientSearchIT extends ReindexTestCase { + + @Override + protected boolean ignoreExternalCluster() { + return true; + } + + public void testDataNodeRestart() throws Exception { + internalCluster().ensureAtLeastNumDataNodes(2); + int shardCount = randomIntBetween(2, 10); + createIndex("test", + Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, shardCount) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build()); + + // create index and mapping up front to avoid master involvement, since that can result in an item failing with NodeClosedException. + assertAcked(prepareCreate("dest") + .addMapping("_doc", jsonBuilder() + .startObject() + .startObject("properties") + .startObject("data") + .field("type", "long") + .endObject() + .endObject() + .endObject()) + .setSettings(Settings.builder() + .put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, shardCount) + .put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1) + .build())); + + int numberOfDocuments = randomIntBetween(10, 50); + indexRandom(true, IntStream.range(0, numberOfDocuments) + .mapToObj(i -> client().prepareIndex("test", "doc", String.valueOf(i)).setSource("data", i)).collect(Collectors.toList())); + + ensureGreen("test"); + String reindexNode = internalCluster().startCoordinatingOnlyNode(Settings.EMPTY); + NodeClient reindexNodeClient = internalCluster().getInstance(NodeClient.class, reindexNode); + + ReindexRequest request = new ReindexRequest(); + request.setSourceIndices("test").setSourceBatchSize(1); + request.setDestIndex("dest"); + request.setRequestsPerSecond(0.3f); // 30 seconds minimum + request.setAbortOnVersionConflict(false); + if (randomBoolean()) { + request.getSearchRequest().source(new SearchSourceBuilder().query(new MatchAllQueryBuilder())); + } + PlainListenableActionFuture reindexFuture = PlainListenableActionFuture.newListenableFuture(); + Task reindexTask = reindexNodeClient.executeLocally(ReindexAction.INSTANCE, request, reindexFuture); + + assertBusy(() -> { + IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats("test").execute().actionGet(); + assertThat(Stream.of(indicesStatsResponse.getIndex("test").getShards()) + .mapToLong(shardStat -> shardStat.getStats().search.getTotal().getScrollCurrent()).sum(), + Matchers.equalTo((long) shardCount)); + // wait for all initial search to complete, since we do not retry those. + assertThat(client().admin().cluster().prepareListTasks().setActions(SearchAction.NAME).get().getTasks(), Matchers.empty()); + }, 30, TimeUnit.SECONDS); + + for (int i = 0; i < randomIntBetween(1,5); ++i) { + // todo: replace following two lines with below once search fails on RED every time. + internalCluster().restartRandomDataNode(); + ensureGreen(); +// internalCluster().restartRandomDataNode(new InternalTestCluster.RestartCallback() { +// @Override +// public Settings onNodeStopped(String nodeName) throws Exception { +// internalCluster().restartRandomDataNode(); +// return super.onNodeStopped(nodeName); +// } +// }); + } + + rethrottle().setTaskId(new TaskId(reindexNodeClient.getLocalNodeId(), reindexTask.getId())) + .setRequestsPerSecond(Float.POSITIVE_INFINITY).execute().get(); + + BulkByScrollResponse bulkByScrollResponse = reindexFuture.actionGet(30, TimeUnit.SECONDS); + // todo: this assert fails sometimes due to missing retry on transport closed +// assertThat(bulkByScrollResponse.getBulkFailures(), Matchers.empty()); + assertEquals(0, bulkByScrollResponse.getSearchFailures().size()); + + assertSameDocs(numberOfDocuments, "test", "dest"); + } + + private void assertSameDocs(int numberOfDocuments, String... indices) { + refresh(indices); + SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource().size(0) + .aggregation(new TermsAggregationBuilder("unique_count", ValueType.LONG) + .field("data") + .order(BucketOrder.count(true)) + .size(numberOfDocuments + 1) + ); + + SearchResponse searchResponse = client().search(new SearchRequest(indices).source(sourceBuilder)).actionGet(); + Terms termsAggregation = searchResponse.getAggregations().get("unique_count"); + assertEquals("Must have a bucket per doc", termsAggregation.getBuckets().size(), numberOfDocuments); + assertEquals("First bucket must have a doc per index", indices.length, termsAggregation.getBuckets().get(0).getDocCount()); + // grouping by unique field data, we are sure that no bucket has more than #indices docs, thus above is enough to check that we + // have the same docs. + } +} diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexScriptTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexScriptTests.java index 54a2ea96b25f5..dc538c98c5a17 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexScriptTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexScriptTests.java @@ -105,6 +105,6 @@ protected ReindexRequest request() { @Override protected Reindexer.AsyncIndexBySearchAction action(ScriptService scriptService, ReindexRequest request) { ReindexSslConfig sslConfig = Mockito.mock(ReindexSslConfig.class); - return new Reindexer.AsyncIndexBySearchAction(task, logger, null, threadPool, scriptService, sslConfig, request, listener()); + return new Reindexer.AsyncIndexBySearchAction(task, logger, null, threadPool, scriptService, sslConfig, request, null, listener()); } } diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexTestCase.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexTestCase.java index 2b53f2842f164..4be49d35d4b6d 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexTestCase.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/ReindexTestCase.java @@ -19,6 +19,7 @@ package org.elasticsearch.index.reindex; +import org.elasticsearch.client.Client; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.test.ESIntegTestCase; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; @@ -42,7 +43,11 @@ protected Collection> nodePlugins() { } protected ReindexRequestBuilder reindex() { - return new ReindexRequestBuilder(client(), ReindexAction.INSTANCE); + return reindex(client()); + } + + protected ReindexRequestBuilder reindex(Client client) { + return new ReindexRequestBuilder(client, ReindexAction.INSTANCE); } protected UpdateByQueryRequestBuilder updateByQuery() { diff --git a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java index f9e952baa127e..5459c64bb75f8 100644 --- a/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java +++ b/modules/reindex/src/test/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSourceTests.java @@ -158,7 +158,7 @@ private void assertLookupRemoteVersion(Version expected, String s) throws Except public void testParseStartOk() throws Exception { AtomicBoolean called = new AtomicBoolean(); - sourceWithMockedRemoteCall("start_ok.json").doStart(wrapAsListener(r -> { + sourceWithMockedRemoteCall("start_ok.json").doStart(TimeValue.ZERO, wrapAsListener(r -> { assertFalse(r.isTimedOut()); assertEquals(FAKE_SCROLL_ID, r.getScrollId()); assertEquals(4, r.getTotalHits()); @@ -226,7 +226,7 @@ public void testParseScrollFullyLoadedFrom1_7() throws Exception { */ public void testScanJumpStart() throws Exception { AtomicBoolean called = new AtomicBoolean(); - sourceWithMockedRemoteCall("start_scan.json", "scroll_ok.json").doStart(wrapAsListener(r -> { + sourceWithMockedRemoteCall("start_scan.json", "scroll_ok.json").doStart(TimeValue.ZERO, wrapAsListener(r -> { assertFalse(r.isTimedOut()); assertEquals(FAKE_SCROLL_ID, r.getScrollId()); assertEquals(4, r.getTotalHits()); @@ -266,7 +266,7 @@ public void testParseRejection() throws Exception { assertEquals("{\"test\":\"test1\"}", r.getHits().get(0).getSource().utf8ToString()); called.set(true); }; - sourceWithMockedRemoteCall("rejection.json").doStart(wrapAsListener(checkResponse)); + sourceWithMockedRemoteCall("rejection.json").doStart(TimeValue.ZERO, wrapAsListener(checkResponse)); assertTrue(called.get()); called.set(false); sourceWithMockedRemoteCall("rejection.json").doStartNextScroll("scroll", timeValueMillis(0), wrapAsListener(checkResponse)); @@ -295,7 +295,7 @@ public void testParseFailureWithStatus() throws Exception { assertEquals("{\"test\":\"test10000\"}", r.getHits().get(0).getSource().utf8ToString()); called.set(true); }; - sourceWithMockedRemoteCall("failure_with_status.json").doStart(wrapAsListener(checkResponse)); + sourceWithMockedRemoteCall("failure_with_status.json").doStart(TimeValue.ZERO, wrapAsListener(checkResponse)); assertTrue(called.get()); called.set(false); sourceWithMockedRemoteCall("failure_with_status.json").doStartNextScroll("scroll", timeValueMillis(0), @@ -317,7 +317,7 @@ public void testParseRequestFailure() throws Exception { assertEquals(14, failure.getColumnNumber()); called.set(true); }; - sourceWithMockedRemoteCall("request_failure.json").doStart(wrapAsListener(checkResponse)); + sourceWithMockedRemoteCall("request_failure.json").doStart(TimeValue.ZERO, wrapAsListener(checkResponse)); assertTrue(called.get()); called.set(false); sourceWithMockedRemoteCall("request_failure.json").doStartNextScroll("scroll", timeValueMillis(0), wrapAsListener(checkResponse)); @@ -369,7 +369,7 @@ public void testThreadContextRestored() throws Exception { String header = randomAlphaOfLength(5); threadPool.getThreadContext().putHeader("test", header); AtomicBoolean called = new AtomicBoolean(); - sourceWithMockedRemoteCall("start_ok.json").doStart(wrapAsListener(r -> { + sourceWithMockedRemoteCall("start_ok.json").doStart(TimeValue.ZERO, wrapAsListener(r -> { assertEquals(header, threadPool.getThreadContext().getHeader("test")); called.set(true); })); diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/35_search_failures.yml b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/35_search_failures.yml index 5bee4d9844d30..d2aa0f82fad21 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/35_search_failures.yml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/35_search_failures.yml @@ -28,19 +28,11 @@ source: throw new IllegalArgumentException("Cats!") dest: index: dest - - match: {created: 0} - - match: {updated: 0} - - match: {version_conflicts: 0} - - match: {batches: 0} - - match: {failures.0.shard: 0} - - match: {failures.0.index: source} - - is_true: failures.0.node - # TODO: Changed the message because of exception serialization - - match: {failures.0.reason.type: exception} - # TODO: Changed the message because of exception serialization - - match: {failures.0.reason.reason: "Elasticsearch exception [type=script_exception, reason=runtime error]"} - # TODO: Changed the message because of exception serialization - - match: {failures.0.reason.caused_by.type: exception} - # TODO: Changed the message because of exception serialization - - match: {failures.0.reason.caused_by.reason: "Elasticsearch exception [type=illegal_argument_exception, reason=Cats!]"} - - gte: { took: 0 } + # TODO: revisit this once we keep original type. + - match: {error.root_cause.0.type: exception} + # TODO: revisit this once we keep original reason message. + - match: {error.root_cause.0.reason: "Elasticsearch exception [type=search_phase_execution_exception, reason=Partial shards failure]"} + - match: {error.root_cause.0.phase: query} + - match: {error.type: status_exception} + - match: {error.reason: "Elasticsearch exception [type=search_phase_execution_exception, reason=Partial shards failure]"} + # TODO: we kind of change the response format here, but we never documented this so OK? diff --git a/server/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java b/server/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java index 04bfb1c36adfc..9b9b2fb524069 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java @@ -39,7 +39,10 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.mapper.RoutingFieldMapper; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.RangeQueryBuilder; import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.threadpool.ThreadPool; import java.util.ArrayList; @@ -49,7 +52,6 @@ import static java.util.Collections.emptyList; import static java.util.Collections.unmodifiableList; import static org.elasticsearch.common.unit.TimeValue.timeValueNanos; -import static org.elasticsearch.common.util.CollectionUtils.isEmpty; /** * A scrollable source of hits from a {@linkplain Client} instance. @@ -57,33 +59,68 @@ public class ClientScrollableHitSource extends ScrollableHitSource { private final ParentTaskAssigningClient client; private final SearchRequest firstSearchRequest; + private final String restartFromField; public ClientScrollableHitSource(Logger logger, BackoffPolicy backoffPolicy, ThreadPool threadPool, Runnable countSearchRetry, Consumer onResponse, Consumer fail, - ParentTaskAssigningClient client, SearchRequest firstSearchRequest) { - super(logger, backoffPolicy, threadPool, countSearchRetry, onResponse, fail); + ParentTaskAssigningClient client, SearchRequest firstSearchRequest, String restartFromField) { + super(logger, backoffPolicy, threadPool, countSearchRetry, onResponse, fail, restartFromField); this.client = client; this.firstSearchRequest = firstSearchRequest; + this.restartFromField = restartFromField; } @Override - public void doStart(RejectAwareActionListener searchListener) { - if (logger.isDebugEnabled()) { - logger.debug("executing initial scroll against {}", - isEmpty(firstSearchRequest.indices()) ? "all indices" : firstSearchRequest.indices()); + public void doStart(TimeValue extraKeepAlive, RejectAwareActionListener searchListener) { + SearchRequest searchRequest = firstSearchRequest; + if (extraKeepAlive != null && extraKeepAlive.nanos() != 0) { + searchRequest = new SearchRequest(searchRequest).scroll(keepAliveTime(extraKeepAlive)); } - client.search(firstSearchRequest, wrapListener(searchListener)); + client.search(searchRequest, wrapListener(searchListener)); + } + + @Override + protected void doRestart(TimeValue extraKeepAlive, long restartFromValue, RejectAwareActionListener searchListener) { + SearchRequest restartRequest = createRestartFromRequest(restartFromValue, extraKeepAlive); + client.search(restartRequest, wrapListener(searchListener)); + } + + private SearchRequest createRestartFromRequest(long restartFromValue, TimeValue extraKeepAlive) { + SearchRequest restartFromRequest = new SearchRequest(firstSearchRequest).scroll(keepAliveTime(extraKeepAlive)); + RangeQueryBuilder restartFromFilter = new RangeQueryBuilder(restartFromField).gte(restartFromValue); + SearchSourceBuilder newSearchSourceBuilder = + restartFromRequest.source() != null ? restartFromRequest.source().copy() : new SearchSourceBuilder(); + if (newSearchSourceBuilder.query() == null) { + newSearchSourceBuilder.query(restartFromFilter); + } else { + newSearchSourceBuilder.query(new BoolQueryBuilder().filter(restartFromRequest.source().query()).filter(restartFromFilter)); + } + restartFromRequest.source(newSearchSourceBuilder); + return restartFromRequest; + } + + protected String[] indices() { + return firstSearchRequest.indices(); } @Override protected void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, RejectAwareActionListener searchListener) { SearchScrollRequest request = new SearchScrollRequest(); // Add the wait time into the scroll timeout so it won't timeout while we wait for throttling - request.scrollId(scrollId).scroll(timeValueNanos(firstSearchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos())); + request.scrollId(scrollId).scroll(keepAliveTime(extraKeepAlive)); client.searchScroll(request, wrapListener(searchListener)); } - private ActionListener wrapListener(RejectAwareActionListener searchListener) { + @Override + protected boolean canRestart() { + return restartFromField != null; + } + + private TimeValue keepAliveTime(TimeValue extraKeepAlive) { + return timeValueNanos(firstSearchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos()); + } + + private ActionListener wrapListener(RejectAwareActionListener searchListener) { return new ActionListener<>() { @Override public void onResponse(SearchResponse searchResponse) { diff --git a/server/src/main/java/org/elasticsearch/index/reindex/RetryListener.java b/server/src/main/java/org/elasticsearch/index/reindex/RetryListener.java index d197e1bed12f9..bd1e2ef47513c 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/RetryListener.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/RetryListener.java @@ -33,37 +33,73 @@ class RetryListener implements RejectAwareActionListener retries; private final ThreadPool threadPool; + private final Consumer> restartHandler; private final Consumer> retryScrollHandler; private final ActionListener delegate; private int retryCount = 0; + private boolean forceRestart = false; RetryListener(Logger logger, ThreadPool threadPool, BackoffPolicy backoffPolicy, - Consumer> retryScrollHandler, - ActionListener delegate) { + Consumer> restartHandler, + Consumer> retryScrollHandler, + ActionListener delegate) { this.logger = logger; this.threadPool = threadPool; this.retries = backoffPolicy.iterator(); + this.restartHandler = restartHandler; this.retryScrollHandler = retryScrollHandler; this.delegate = delegate; } @Override public void onResponse(ScrollableHitSource.Response response) { + if (response.getFailures().isEmpty() == false) { + // some but not all shards failed, we cannot process data, since our resume marker would progress too much. + if (retries.hasNext()) { + TimeValue delay = retries.next(); + logger.trace( + () -> new ParameterizedMessage("retrying rejected search after [{}] for shard failures [{}]", + delay, + response.getFailures())); + + forceRestart = true; + schedule(() -> restartHandler.accept(this), delay); + return; + } // else respond to let action fail. + } + logger.debug("scroll returned [{}] documents with a scroll id of [{}]", response.getHits().size(), response.getScrollId()); delegate.onResponse(response); } @Override public void onFailure(Exception e) { - delegate.onFailure(e); + handleException(e, + delay -> { + logger.trace(() -> new ParameterizedMessage("restarting failed search after [{}]", delay), e); + forceRestart = true; + schedule(() -> restartHandler.accept(this), delay); + }); } @Override public void onRejection(Exception e) { + handleException(e, + delay -> { + if (forceRestart) { + logger.trace(() -> new ParameterizedMessage("restarting rejected search after [{}]", delay), e); + schedule(() -> restartHandler.accept(this), delay); + } else { + logger.trace(() -> new ParameterizedMessage("retrying rejected scroll after [{}]", delay), e); + schedule(() -> retryScrollHandler.accept(this), delay); + } + }); + } + + public void handleException(Exception e, Consumer action) { if (retries.hasNext()) { retryCount += 1; TimeValue delay = retries.next(); - logger.trace(() -> new ParameterizedMessage("retrying rejected search after [{}]", delay), e); - schedule(() -> retryScrollHandler.accept(this), delay); + action.accept(delay); } else { logger.warn(() -> new ParameterizedMessage( "giving up on search because we retried [{}] times without success", retryCount), e); diff --git a/server/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java b/server/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java index 07d22ddb663fe..1c36db46c9a5b 100644 --- a/server/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java +++ b/server/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.xcontent.ToXContentObject; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; @@ -46,13 +47,15 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.ToLongFunction; import static java.util.Objects.requireNonNull; +import static org.elasticsearch.common.util.CollectionUtils.isEmpty; /** * A scrollable source of results. Pumps data out into the passed onResponse consumer. Same data may come out several times in case - * of failures during searching (though not yet). Once the onResponse consumer is done, it should call AsyncResponse.isDone(time) to receive - * more data (only receives one response at a time). + * of failures during searching. Once the onResponse consumer is done, it should call AsyncResponse.isDone(time) to receive more data + * (only receives one response at a time). */ public abstract class ScrollableHitSource { private final AtomicReference scrollId = new AtomicReference<>(); @@ -60,38 +63,126 @@ public abstract class ScrollableHitSource { protected final Logger logger; protected final BackoffPolicy backoffPolicy; protected final ThreadPool threadPool; - protected final Runnable countSearchRetry; + private final Runnable countSearchRetry; private final Consumer onResponse; - protected final Consumer fail; + private final Consumer fail; + private final ToLongFunction restartFromValueFunction; + private long restartFromValue = Long.MIN_VALUE; // need refinement if we support descending. public ScrollableHitSource(Logger logger, BackoffPolicy backoffPolicy, ThreadPool threadPool, Runnable countSearchRetry, - Consumer onResponse, Consumer fail) { + Consumer onResponse, Consumer fail, String restartFromField) { this.logger = logger; this.backoffPolicy = backoffPolicy; this.threadPool = threadPool; this.countSearchRetry = countSearchRetry; this.onResponse = onResponse; this.fail = fail; + if (restartFromField != null) { + if (SeqNoFieldMapper.NAME.equals(restartFromField)) { + restartFromValueFunction = Hit::getSeqNo; + } else { + restartFromValueFunction = hit -> Long.MIN_VALUE; + // todo: non-seqno field support. + // need to extract field, either from source or by asking for it explicitly. + // also we need to handle missing values. + // hit -> ((Number) hit.field(restartFromField).getValue()).longValue(); + } + } else { + restartFromValueFunction = hit -> Long.MIN_VALUE; + } + } + + public final long restartFromValue() { + return restartFromValue; + } + + public final void restartFromValue(long restartFromValue) { + this.restartFromValue = restartFromValue; } public final void start() { - doStart(createRetryListener(this::doStart)); + if (logger.isDebugEnabled()) { + logger.debug("executing initial scroll against {}", + isEmpty(indices()) ? "all indices" : indices()); + } + + // todo: we never restart the original request, since if this fails, we probably want fast feedback. But when we add + // resume from seqNo, we should do retry on that original request, so this needs some care at that time. + // So far, rejections (429) still lead to retries, since they always did. + restartNoLogging(TimeValue.ZERO, createRetryListenerNoRestart(this::restart)); + + } + + private void restart(RejectAwareActionListener searchListener) { + restart(TimeValue.ZERO, searchListener); } - private RetryListener createRetryListener(Consumer> retryHandler) { - Consumer> countingRetryHandler = listener -> { - countSearchRetry.run(); - retryHandler.accept(listener); + private void restart(TimeValue extraKeepAlive, RejectAwareActionListener searchListener) { + if (logger.isDebugEnabled()) { + logger.debug("restarting search against {} from resume marker {}", + isEmpty(indices()) ? "all indices" : indices(), restartFromValue()); + } + restartNoLogging(extraKeepAlive, searchListener); + } + + private void restartNoLogging(TimeValue extraKeepAlive, RejectAwareActionListener searchListener) { + String scrollId = this.scrollId.get(); + if (scrollId != null) { + // we do not bother waiting for the scroll to be cleared, yet at least. We could implement a policy to + // not have more than x old scrolls outstanding and wait for their timeout before continuing (we know the timeout). + // A flaky connection could in principle lead to many scrolls within the timeout window, so could be worth pursuing. + clearScroll(scrollId, () -> {}); + this.scrollId.set(null); + } + if (restartFromValue == Long.MIN_VALUE) { + doStart(extraKeepAlive, searchListener); + } else { + doRestart(extraKeepAlive, restartFromValue, searchListener); + } + } + + private RetryListener createRetryListener(Consumer> restartHandler, + Consumer> retryScrollHandler) { + if (canRestart()) { + return new RetryListener(logger, threadPool, backoffPolicy, + countRetries(restartHandler), countRetries(retryScrollHandler), + ActionListener.wrap(this::onResponse, fail)); + } else { + return createRetryListenerNoRestart(retryScrollHandler); + } + } + + private RetryListener createRetryListenerNoRestart(Consumer> retryScrollHandler) { + return new RetryListener(logger, threadPool, backoffPolicy, + x -> { throw new UnsupportedOperationException(); }, countRetries(retryScrollHandler), + ActionListener.wrap(this::onResponse, fail)) { + @Override + public void onResponse(Response response) { + ScrollableHitSource.this.onResponse(response); + } + + @Override + public void onFailure(Exception e) { + fail.accept(e); + } }; - return new RetryListener(logger, threadPool, backoffPolicy, countingRetryHandler, - ActionListener.wrap(this::onResponse, fail)); + } + + private Consumer> countRetries(Consumer> retryHandler) { + return listener -> { + countSearchRetry.run(); + retryHandler.accept(listener); + }; } // package private for tests. final void startNextScroll(TimeValue extraKeepAlive) { - startNextScroll(extraKeepAlive, createRetryListener(listener -> startNextScroll(extraKeepAlive, listener))); + startNextScroll(extraKeepAlive, createRetryListener(listener -> restart(extraKeepAlive, listener), + listener -> startNextScroll(extraKeepAlive, listener) + )); } private void startNextScroll(TimeValue extraKeepAlive, RejectAwareActionListener searchListener) { + assert scrollId.get() != null; doStartNextScroll(scrollId.get(), extraKeepAlive, searchListener); } @@ -108,11 +199,21 @@ public Response response() { @Override public void done(TimeValue extraKeepAlive) { assert alreadyDone.compareAndSet(false, true); + restartFromValue = extractRestartFromValue(response, restartFromValue); startNextScroll(extraKeepAlive); } }); } + private long extractRestartFromValue(Response response, long defaultValue) { + List hits = response.hits; + if (hits.size() != 0) { + return restartFromValueFunction.applyAsLong(hits.get(hits.size() - 1)); + } else { + return defaultValue; + } + } + public final void close(Runnable onCompletion) { String scrollId = this.scrollId.get(); if (Strings.hasLength(scrollId)) { @@ -123,10 +224,13 @@ public final void close(Runnable onCompletion) { } // following is the SPI to be implemented. - protected abstract void doStart(RejectAwareActionListener searchListener); + protected abstract void doStart(TimeValue extraKeepAlive, RejectAwareActionListener searchListener); + protected abstract void doRestart(TimeValue extraKeepAlive, long restartFromValue, RejectAwareActionListener searchListener); protected abstract void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, RejectAwareActionListener searchListener); + protected abstract boolean canRestart(); + protected abstract String[] indices(); /** * Called to clear a scroll id. diff --git a/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java b/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java index bb3dd945db95a..e3ba0861cd2fa 100644 --- a/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java +++ b/server/src/main/java/org/elasticsearch/search/builder/SearchSourceBuilder.java @@ -960,6 +960,13 @@ public SearchSourceBuilder copyWithNewSlice(SliceBuilder slice) { return shallowCopy(queryBuilder, postQueryBuilder, aggregations, slice, sorts, rescoreBuilders, highlightBuilder); } + /** + * @return a shallow copy of this builder. + */ + public SearchSourceBuilder copy() { + return shallowCopy(queryBuilder, postQueryBuilder, aggregations, sliceBuilder, sorts, rescoreBuilders, highlightBuilder); + } + /** * Create a shallow copy of this source replaced {@link #queryBuilder}, {@link #postQueryBuilder}, and {@link #sliceBuilder}. Used by * {@link #rewrite(QueryRewriteContext)} and {@link #copyWithNewSlice(SliceBuilder)}.