Skip to content

Commit

Permalink
Reindex search resiliency (#45497)
Browse files Browse the repository at this point in the history
Local reindex can now survive loosing data nodes that contain source
data. The original query will be restarted with a filter for
`_seq_no >= last_seq_no` when a failure is detected.

The original/first search request is not retried/restarted since this is not
what we used to do and it leads to long wait times to get the info back
that a search request is bad.

Part of #42612 and split out from #43187
  • Loading branch information
henningandersen authored Aug 19, 2019
1 parent 5ff49ca commit 3ee5c4f
Show file tree
Hide file tree
Showing 20 changed files with 691 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
AbstractAsyncBulkByScrollAction(BulkByScrollTask task, boolean needsSourceDocumentVersions,
boolean needsSourceDocumentSeqNoAndPrimaryTerm, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, Request mainRequest, ActionListener<BulkByScrollResponse> listener,
@Nullable ScriptService scriptService, @Nullable ReindexSslConfig sslConfig) {
@Nullable ScriptService scriptService, @Nullable ReindexSslConfig sslConfig,
@Nullable String restartFromField) {
this.task = task;
this.scriptService = scriptService;
this.sslConfig = sslConfig;
Expand All @@ -135,7 +136,9 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
this.listener = listener;
BackoffPolicy backoffPolicy = buildBackoffPolicy();
bulkRetry = new Retry(BackoffPolicy.wrap(backoffPolicy, worker::countBulkRetry), threadPool);
scrollSource = buildScrollableResultSource(backoffPolicy);
// todo: this is trappy, since if a subclass override relies on subclass fields, they are not initialized. We should fix
// to simply pass in the hit-source.
scrollSource = buildScrollableResultSource(backoffPolicy, restartFromField);
scriptApplier = Objects.requireNonNull(buildScriptApplier(), "script applier must not be null");
/*
* Default to sorting by doc. We can't do this in the request itself because it is normal to *add* to the sorts rather than replace
Expand Down Expand Up @@ -213,10 +216,11 @@ private BulkRequest buildBulk(Iterable<? extends ScrollableHitSource.Hit> docs)
return bulkRequest;
}

protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy) {
protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy,
String restartFromField) {
return new ClientScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry,
this::onScrollResponse, this::finishHim, client,
mainRequest.getSearchRequest());
mainRequest.getSearchRequest(), restartFromField);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ public class AsyncDeleteByQueryAction extends AbstractAsyncBulkByScrollAction<De
public AsyncDeleteByQueryAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, DeleteByQueryRequest request, ScriptService scriptService,
ActionListener<BulkByScrollResponse> listener) {
super(task, false, true, logger, client, threadPool, request, listener, scriptService, null);
super(task, false, true, logger, client, threadPool, request, listener,
scriptService, null, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.ParentTaskAssigningClient;
import org.elasticsearch.client.RestClient;
Expand All @@ -47,10 +48,14 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.reindex.remote.RemoteScrollableHitSource;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
Expand Down Expand Up @@ -92,17 +97,42 @@ public void initTask(BulkByScrollTask task, ReindexRequest request, ActionListen
}

public void execute(BulkByScrollTask task, ReindexRequest request, ActionListener<BulkByScrollResponse> listener) {
request.getSearchRequest().allowPartialSearchResults(false);
// Notice that this is called both on leader and workers when slicing.
String resumableSortingField = request.getRemoteInfo() == null ? getOrAddRestartFromField(request.getSearchRequest()) : null;

BulkByScrollParallelizationHelper.executeSlicedAction(task, request, ReindexAction.INSTANCE, listener, client,
clusterService.localNode(),
() -> {
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(), task);
AsyncIndexBySearchAction searchAction = new AsyncIndexBySearchAction(task, logger, assigningClient, threadPool,
scriptService, reindexSslConfig, request, listener);
scriptService, reindexSslConfig, request, resumableSortingField, listener);
searchAction.start();
});

}

private static String getOrAddRestartFromField(SearchRequest searchRequest) {
// we keep with the tradition of modifying the input request, though this can lead to strange results (in transport clients).
List<SortBuilder<?>> sorts = searchRequest.source().sorts();
if (sorts != null && sorts.size() >= 1) {
SortBuilder<?> firstSort = sorts.get(0);
if (firstSort instanceof FieldSortBuilder) {
FieldSortBuilder fieldSort = (FieldSortBuilder) firstSort;
if (SeqNoFieldMapper.NAME.equals(fieldSort.getFieldName())
&& fieldSort.order() == SortOrder.ASC) {
return SeqNoFieldMapper.NAME;
}
// todo: support non seq_no fields and descending, but need to check field is numeric and handle missing values too then.
}
return null;
}

// use unmapped_type to ensure that sorting works when index is newly created without mappings
searchRequest.source().sort(new FieldSortBuilder(SeqNoFieldMapper.NAME).unmappedType("long"));
return SeqNoFieldMapper.NAME;
}

/**
* Build the {@link RestClient} used for reindexing from remote clusters.
* @param remoteInfo connection information for the remote cluster
Expand Down Expand Up @@ -170,18 +200,20 @@ static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<Re

AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, ScriptService scriptService, ReindexSslConfig sslConfig, ReindexRequest request,
ActionListener<BulkByScrollResponse> listener) {
String restartFromField, ActionListener<BulkByScrollResponse> listener) {
super(task,
/*
* We only need the source version if we're going to use it when write and we only do that when the destination request uses
* external versioning.
*/
request.getDestination().versionType() != VersionType.INTERNAL,
false, logger, client, threadPool, request, listener, scriptService, sslConfig);
SeqNoFieldMapper.NAME.equals(restartFromField), logger, client, threadPool, request, listener,
scriptService, sslConfig, restartFromField);
}

@Override
protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy) {
protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy,
String restartFromField) {
if (mainRequest.getRemoteInfo() != null) {
RemoteInfo remoteInfo = mainRequest.getRemoteInfo();
createdThreads = synchronizedList(new ArrayList<>());
Expand All @@ -191,7 +223,7 @@ protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffP
this::onScrollResponse, this::finishHim,
restClient, remoteInfo.getQuery(), mainRequest.getSearchRequest());
}
return super.buildScrollableResultSource(backoffPolicy);
return super.buildScrollableResultSource(backoffPolicy, restartFromField);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<Up
super(task,
// use sequence number powered optimistic concurrency control
false, true,
logger, client, threadPool, request, listener, scriptService, null);
logger, client, threadPool, request, listener, scriptService, null, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,14 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
public RemoteScrollableHitSource(Logger logger, BackoffPolicy backoffPolicy, ThreadPool threadPool, Runnable countSearchRetry,
Consumer<AsyncResponse> onResponse, Consumer<Exception> fail,
RestClient client, BytesReference query, SearchRequest searchRequest) {
super(logger, backoffPolicy, threadPool, countSearchRetry, onResponse, fail);
super(logger, backoffPolicy, threadPool, countSearchRetry, onResponse, fail, null);// todo: handle resume or grace
this.query = query;
this.searchRequest = searchRequest;
this.client = client;
}

@Override
protected void doStart(RejectAwareActionListener<Response> searchListener) {
protected void doStart(TimeValue extraKeepAlive, RejectAwareActionListener<Response> searchListener) {
lookupRemoteVersion(RejectAwareActionListener.withResponseHandler(searchListener, version -> {
remoteVersion = version;
execute(RemoteRequestBuilders.initialSearch(searchRequest, query, remoteVersion),
Expand All @@ -97,12 +97,28 @@ private void onStartResponse(RejectAwareActionListener<Response> searchListener,
}
}

@Override
protected void doRestart(TimeValue extraKeepAlive, long restartFromValue, RejectAwareActionListener<Response> searchListener) {
assert false;
throw new UnsupportedOperationException("restart during remote reindex not supported yet");
}

@Override
protected void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, RejectAwareActionListener<Response> searchListener) {
TimeValue keepAlive = timeValueNanos(searchRequest.scroll().keepAlive().nanos() + extraKeepAlive.nanos());
execute(RemoteRequestBuilders.scroll(scrollId, keepAlive, remoteVersion), RESPONSE_PARSER, searchListener);
}

@Override
protected boolean canRestart() {
return false;
}

@Override
protected String[] indices() {
return searchRequest.indices();
}

@Override
protected void clearScroll(String scrollId, Runnable onCompletion) {
client.performRequestAsync(RemoteRequestBuilders.clearScroll(scrollId, remoteVersion), new ResponseListener() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public void setup() {
@SuppressWarnings("unused")
public void testReindex() {
Client client = client();
// todo: this is only necessary to ensure the seqno mapping is created.
client().prepareIndex(INDEX_NAME, "_doc", "1").setSource("data", "x").get();
// tag::reindex1
BulkByScrollResponse response =
new ReindexRequestBuilder(client, ReindexAction.INSTANCE)
Expand All @@ -94,6 +96,8 @@ public void testReindex() {
.filter(QueryBuilders.matchQuery("category", "xzy")) // <1>
.get();
// end::reindex1

client().prepareDelete(INDEX_NAME, "_doc", "1").get();
}

@SuppressWarnings("unused")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public void testStartNextScrollRetriesOnRejectionAndSucceeds() throws Exception
ClientScrollableHitSource hitSource = new ClientScrollableHitSource(logger, buildTestBackoffPolicy(),
threadPool,
testTask.getWorkerState()::countSearchRetry, r -> fail(), ExceptionsHelper::reThrowIfNotNull,
new ParentTaskAssigningClient(client, localNode, testTask), testRequest.getSearchRequest());
new ParentTaskAssigningClient(client, localNode, testTask), testRequest.getSearchRequest(), null);
hitSource.setScroll(scrollId());
hitSource.startNextScroll(TimeValue.timeValueSeconds(0));
assertBusy(() -> assertEquals(client.scrollsToReject + 1, client.scrollAttempts.get()));
Expand All @@ -240,7 +240,7 @@ public void testStartNextScrollRetriesOnRejectionButFailsOnTooManyRejections() t
ClientScrollableHitSource hitSource = new ClientScrollableHitSource(logger, buildTestBackoffPolicy(),
threadPool,
testTask.getWorkerState()::countSearchRetry, r -> fail(), validingOnFail,
new ParentTaskAssigningClient(client, localNode, testTask), testRequest.getSearchRequest());
new ParentTaskAssigningClient(client, localNode, testTask), testRequest.getSearchRequest(), null);
hitSource.setScroll(scrollId());
hitSource.startNextScroll(TimeValue.timeValueSeconds(0));
assertBusy(() -> assertEquals(testRequest.getMaxRetries() + 1, client.scrollAttempts.get()));
Expand Down Expand Up @@ -733,7 +733,8 @@ private class DummyAsyncBulkByScrollAction
extends AbstractAsyncBulkByScrollAction<DummyAbstractBulkByScrollRequest, DummyTransportAsyncBulkByScrollAction> {
DummyAsyncBulkByScrollAction() {
super(testTask, randomBoolean(), randomBoolean(), AsyncBulkByScrollActionTests.this.logger,
new ParentTaskAssigningClient(client, localNode, testTask), client.threadPool(), testRequest, listener, null, null);
new ParentTaskAssigningClient(client, localNode, testTask), client.threadPool(), testRequest, listener,
null, null, null);
}

@Override
Expand Down
Loading

0 comments on commit 3ee5c4f

Please sign in to comment.