Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reindex search resiliency prototype #43187

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
private final AtomicLong startTime = new AtomicLong(-1);
private final Set<String> destinationIndices = Collections.newSetFromMap(new ConcurrentHashMap<>());

private final ParentTaskAssigningClient client;
protected final ParentTaskAssigningClient client;
private final ActionListener<BulkByScrollResponse> listener;
private final Retry bulkRetry;
private final ScrollableHitSource scrollSource;
Expand All @@ -112,6 +112,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
* {@link RequestWrapper} completely.
*/
private final BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> scriptApplier;
private int lastBatchSize;

public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, boolean needsSourceDocumentVersions,
boolean needsSourceDocumentSeqNoAndPrimaryTerm, Logger logger, ParentTaskAssigningClient client,
Expand All @@ -138,6 +139,8 @@ public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, boolean needsSourc
* Default to sorting by doc. We can't do this in the request itself because it is normal to *add* to the sorts rather than replace
* them and if we add _doc as the first sort by default then sorts will never work.... So we add it here, only if there isn't
* another sort.
*
* Notice that this is no longer ever applied during reindex.
*/
final SearchSourceBuilder sourceBuilder = mainRequest.getSearchRequest().source();
List<SortBuilder<?>> sorts = sourceBuilder.sorts();
Expand Down Expand Up @@ -211,8 +214,9 @@ private BulkRequest buildBulk(Iterable<? extends ScrollableHitSource.Hit> docs)
}

protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy) {
return new ClientScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry, this::finishHim, client,
mainRequest.getSearchRequest());
return new ClientScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry,
this::onScrollResponse, this::finishHim, client,
mainRequest.getSearchRequest(), null, null);
}

/**
Expand All @@ -235,19 +239,33 @@ public void start() {
}
try {
startTime.set(System.nanoTime());
scrollSource.start(response -> onScrollResponse(timeValueNanos(System.nanoTime()), 0, response));
scrollSource.start();
} catch (Exception e) {
finishHim(e);
}
}

void onScrollResponse(ScrollableHitSource.AsyncResponse asyncResponse) {
// lastBatchStartTime is essentially unused (see WorkerBulkByScrollTaskState.throttleWaitTime. Leaving it for now, since it seems
// like a bug?
// The current delay is calculated from lastBatchSize, which is the amount of bulk requests we send. But given that scripting
// must be involved in skipping bulk requests, it seems unfair to simply discount them completely. Makes more sense to me to count
// them in.
// I am also inclined to redo the way scroll search timeout adjustment works to just be (throttle-factor * batch-size)?
onScrollResponse(new TimeValue(System.nanoTime()), this.lastBatchSize, asyncResponse);
// this changes throttling to count all responses rather than only surviving requests after scripting. More work neeeded depending
// on decision.
this.lastBatchSize = asyncResponse.response().getHits().size();
}

/**
* Process a scroll response.
* @param lastBatchStartTime the time when the last batch started. Used to calculate the throttling delay.
* @param lastBatchSize the size of the last batch. Used to calculate the throttling delay.
* @param response the scroll response to process
* @param asyncResponse the scroll response to process
*/
void onScrollResponse(TimeValue lastBatchStartTime, int lastBatchSize, ScrollableHitSource.Response response) {
void onScrollResponse(TimeValue lastBatchStartTime, int lastBatchSize, ScrollableHitSource.AsyncResponse asyncResponse) {
ScrollableHitSource.Response response = asyncResponse.response();
logger.debug("[{}]: got scroll response with [{}] hits", task.getId(), response.getHits().size());
if (task.isCancelled()) {
logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
Expand All @@ -274,7 +292,7 @@ protected void doRun() throws Exception {
* It is important that the batch start time be calculated from here, scroll response to scroll response. That way the time
* waiting on the scroll doesn't count against this batch in the throttle.
*/
prepareBulkRequest(timeValueNanos(System.nanoTime()), response);
prepareBulkRequest(timeValueNanos(System.nanoTime()), asyncResponse);
}

@Override
Expand All @@ -291,7 +309,8 @@ public void onFailure(Exception e) {
* delay has been slept. Uses the generic thread pool because reindex is rare enough not to need its own thread pool and because the
* thread may be blocked by the user script.
*/
void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.Response response) {
void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.AsyncResponse asyncResponse) {
ScrollableHitSource.Response response = asyncResponse.response();
logger.debug("[{}]: preparing bulk request", task.getId());
if (task.isCancelled()) {
logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
Expand All @@ -313,21 +332,32 @@ void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.Respon
}
BulkRequest request = buildBulk(hits);
if (request.requests().isEmpty()) {
if (task.isCancelled()) {
logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
finishHim(null);
return;
}
/*
* If we noop-ed the entire batch then just skip to the next batch or the BulkRequest would fail validation.
*/
startNextScroll(thisBatchStartTime, timeValueNanos(System.nanoTime()), 0);
notifyDone(thisBatchStartTime, asyncResponse);
return;
}
request.timeout(mainRequest.getTimeout());
request.waitForActiveShards(mainRequest.getWaitForActiveShards());
sendBulkRequest(thisBatchStartTime, request);
sendBulkRequest(thisBatchStartTime, request, () -> notifyDone(thisBatchStartTime, asyncResponse));
}

private void notifyDone(TimeValue thisBatchStartTime, ScrollableHitSource.AsyncResponse asyncResponse) {
// here we also use the hit count and not the request count for now.
asyncResponse.done(worker.throttleWaitTime(thisBatchStartTime,
timeValueNanos(System.nanoTime()), asyncResponse.response().getHits().size()));
}

/**
* Send a bulk request, handling retries.
*/
void sendBulkRequest(TimeValue thisBatchStartTime, BulkRequest request) {
void sendBulkRequest(TimeValue thisBatchStartTime, BulkRequest request, Runnable onSuccess) {
if (logger.isDebugEnabled()) {
logger.debug("[{}]: sending [{}] entry, [{}] bulk request", task.getId(), request.requests().size(),
new ByteSizeValue(request.estimatedSizeInBytes()));
Expand All @@ -340,7 +370,7 @@ void sendBulkRequest(TimeValue thisBatchStartTime, BulkRequest request) {
bulkRetry.withBackoff(client::bulk, request, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
onBulkResponse(thisBatchStartTime, response);
onBulkResponse(thisBatchStartTime, response, onSuccess);
}

@Override
Expand All @@ -353,7 +383,7 @@ public void onFailure(Exception e) {
/**
* Processes bulk responses, accounting for failures.
*/
void onBulkResponse(TimeValue thisBatchStartTime, BulkResponse response) {
void onBulkResponse(TimeValue thisBatchStartTime, BulkResponse response, Runnable onSuccess) {
try {
List<Failure> failures = new ArrayList<>();
Set<String> destinationIndicesThisBatch = new HashSet<>();
Expand Down Expand Up @@ -401,7 +431,13 @@ void onBulkResponse(TimeValue thisBatchStartTime, BulkResponse response) {
return;
}

startNextScroll(thisBatchStartTime, timeValueNanos(System.nanoTime()), response.getItems().length);
if (task.isCancelled()) {
logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
finishHim(null);
return;
}

onSuccess.run();
} catch (Exception t) {
finishHim(t);
}
Expand All @@ -414,15 +450,8 @@ void onBulkResponse(TimeValue thisBatchStartTime, BulkResponse response) {
* when the scroll returns
*/
void startNextScroll(TimeValue lastBatchStartTime, TimeValue now, int lastBatchSize) {
if (task.isCancelled()) {
logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
finishHim(null);
return;
}
TimeValue extraKeepAlive = worker.throttleWaitTime(lastBatchStartTime, now, lastBatchSize);
scrollSource.startNextScroll(extraKeepAlive, response -> {
onScrollResponse(lastBatchStartTime, lastBatchSize, response);
});
/// did not bother cleaning up tests yet.
throw new UnsupportedOperationException();
}

private void recordFailure(Failure failure, List<Failure> failures) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.lucene.uid.Versions;
import org.elasticsearch.common.regex.Regex;
Expand All @@ -64,11 +68,15 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.VersionFieldMapper;
import org.elasticsearch.index.reindex.ScrollableHitSource.SearchFailure;
import org.elasticsearch.index.reindex.remote.RemoteScrollableHitSource;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -100,13 +108,15 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
private final Client client;
private final CharacterRunAutomaton remoteWhitelist;
private final IndexNameExpressionResolver indexNameExpressionResolver;
private final NamedWriteableRegistry registry;

private final ReindexSslConfig sslConfig;

@Inject
public TransportReindexAction(Settings settings, ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver, ClusterService clusterService, ScriptService scriptService,
AutoCreateIndex autoCreateIndex, Client client, TransportService transportService, ReindexSslConfig sslConfig) {
AutoCreateIndex autoCreateIndex, Client client, TransportService transportService, ReindexSslConfig sslConfig,
NamedWriteableRegistry registry) {
super(ReindexAction.NAME, transportService, actionFilters, (Writeable.Reader<ReindexRequest>)ReindexRequest::new);
this.threadPool = threadPool;
this.clusterService = clusterService;
Expand All @@ -116,6 +126,7 @@ public TransportReindexAction(Settings settings, ThreadPool threadPool, ActionFi
remoteWhitelist = buildRemoteWhitelist(REMOTE_CLUSTER_WHITELIST.get(settings));
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.sslConfig = sslConfig;
this.registry = registry;
}

@Override
Expand All @@ -125,19 +136,48 @@ protected void doExecute(Task task, ReindexRequest request, ActionListener<BulkB
validateAgainstAliases(request.getSearchRequest(), request.getDestination(), request.getRemoteInfo(),
indexNameExpressionResolver, autoCreateIndex, state);

// Notice that this is called both on leader and workers when slicing.
String resumableSortingField = request.getRemoteInfo() == null ? getOrAddResumableSortingField(request.getSearchRequest()) : null;

BulkByScrollTask bulkByScrollTask = (BulkByScrollTask) task;

BulkByScrollParallelizationHelper.startSlicedAction(request, bulkByScrollTask, ReindexAction.INSTANCE, listener, client,
clusterService.localNode(),
() -> {
ParentTaskAssigningClient assigningClient = new ParentTaskAssigningClient(client, clusterService.localNode(),
bulkByScrollTask);
new AsyncIndexBySearchAction(bulkByScrollTask, logger, assigningClient, threadPool, this, request, state,
listener).start();
new AsyncIndexBySearchAction(bulkByScrollTask, logger, assigningClient, threadPool, this, request,
resumableSortingField, registry, listener).start();
}
);
}

private String getOrAddResumableSortingField(SearchRequest searchRequest) {
// we keep with the tradition of modifying the input request, though this can lead to strange results (in transport clients).
List<SortBuilder<?>> sorts = searchRequest.source().sorts();
if (sorts != null && sorts.size() >= 1) {
SortBuilder<?> firstSort = sorts.get(0);
if (firstSort instanceof FieldSortBuilder) {
FieldSortBuilder fieldSort = (FieldSortBuilder) firstSort;
if (SeqNoFieldMapper.NAME.equals(fieldSort.getFieldName())
&& fieldSort.order() == SortOrder.ASC) {
// this ensures parallel sub tasks do not modify the request - though this would work with current request impl, this
// seems safer.
if (searchRequest.source().seqNoAndPrimaryTerm() == false) {
searchRequest.source().seqNoAndPrimaryTerm(true);
}
return SeqNoFieldMapper.NAME;
}
// todo: support non seq_no fields and descending, but need to check field is numeric and handle missing values too then.
}
return null;
}

searchRequest.source().sort(SeqNoFieldMapper.NAME);
searchRequest.source().seqNoAndPrimaryTerm(true);
return SeqNoFieldMapper.NAME;
}

static void checkRemoteWhitelist(CharacterRunAutomaton whitelist, RemoteInfo remoteInfo) {
if (remoteInfo == null) {
return;
Expand Down Expand Up @@ -246,13 +286,28 @@ static RestClient buildRestClient(RemoteInfo remoteInfo, ReindexSslConfig sslCon
return builder.build();
}

// todo: is there a better way? Also done in TransportRollupSearchAction.
private static SearchRequest cloneSearchRequest(SearchRequest original, NamedWriteableRegistry namedWriteableRegistry) {
try (BytesStreamOutput output = new BytesStreamOutput()) {
original.writeTo(output);
try (StreamInput in = new NamedWriteableAwareStreamInput(output.bytes().streamInput(), namedWriteableRegistry)) {
return new SearchRequest(in);
}
} catch (IOException e) {
assert false : "unexpected IOException: " + e.getMessage();
throw new RuntimeException(e);
}
}

/**
* Simple implementation of reindex using scrolling and bulk. There are tons
* of optimizations that can be done on certain types of reindex requests
* but this makes no attempt to do any of them so it can be as simple
* possible.
*/
static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<ReindexRequest, TransportReindexAction> {
private final String resumableSortingField;
private final NamedWriteableRegistry registry;
/**
* List of threads created by this process. Usually actions don't create threads in Elasticsearch. Instead they use the builtin
* {@link ThreadPool}s. But reindex-from-remote uses Elasticsearch's {@link RestClient} which doesn't use the
Expand All @@ -262,15 +317,19 @@ static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<Re
private List<Thread> createdThreads = emptyList();

AsyncIndexBySearchAction(BulkByScrollTask task, Logger logger, ParentTaskAssigningClient client,
ThreadPool threadPool, TransportReindexAction action, ReindexRequest request, ClusterState clusterState,
ThreadPool threadPool, TransportReindexAction action, ReindexRequest request, String resumableSortingField,
NamedWriteableRegistry registry,
ActionListener<BulkByScrollResponse> listener) {
super(task,
/*
* We only need the source version if we're going to use it when write and we only do that when the destination request uses
* external versioning.
*/
request.getDestination().versionType() != VersionType.INTERNAL,
false, logger, client, threadPool, action, request, listener);
SeqNoFieldMapper.NAME.equals(resumableSortingField), logger, client, threadPool, action, request, listener);

this.resumableSortingField = resumableSortingField;
this.registry = registry;
}

@Override
Expand All @@ -279,10 +338,18 @@ protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffP
RemoteInfo remoteInfo = mainRequest.getRemoteInfo();
createdThreads = synchronizedList(new ArrayList<>());
RestClient restClient = buildRestClient(remoteInfo, mainAction.sslConfig, task.getId(), createdThreads);
return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry, this::finishHim,
return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry,
this::onScrollResponse, this::finishHim,
restClient, remoteInfo.getQuery(), mainRequest.getSearchRequest());
}
return super.buildScrollableResultSource(backoffPolicy);

return new ClientScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry,
this::onScrollResponse, this::finishHim, client,
mainRequest.getSearchRequest(), resumableSortingField, this::cloneSearchRequest);
}

private SearchRequest cloneSearchRequest(SearchRequest searchRequest) {
return TransportReindexAction.cloneSearchRequest(searchRequest, registry);
}

@Override
Expand Down
Loading