Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reindex ScrollableHitSource pump data out #43864

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public abstract class AbstractAsyncBulkByScrollAction<Request extends AbstractBu
* {@link RequestWrapper} completely.
*/
private final BiFunction<RequestWrapper<?>, ScrollableHitSource.Hit, RequestWrapper<?>> scriptApplier;
private int lastBatchSize;

public AbstractAsyncBulkByScrollAction(BulkByScrollTask task, boolean needsSourceDocumentVersions,
boolean needsSourceDocumentSeqNoAndPrimaryTerm, Logger logger, ParentTaskAssigningClient client,
Expand Down Expand Up @@ -211,7 +212,8 @@ private BulkRequest buildBulk(Iterable<? extends ScrollableHitSource.Hit> docs)
}

protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffPolicy) {
return new ClientScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry, this::finishHim, client,
return new ClientScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry,
this::onScrollResponse, this::finishHim, client,
mainRequest.getSearchRequest());
}

Expand All @@ -235,19 +237,26 @@ public void start() {
}
try {
startTime.set(System.nanoTime());
scrollSource.start(response -> onScrollResponse(timeValueNanos(System.nanoTime()), 0, response));
scrollSource.start();
} catch (Exception e) {
finishHim(e);
}
}

void onScrollResponse(ScrollableHitSource.AsyncResponse asyncResponse) {
// lastBatchStartTime is essentially unused (see WorkerBulkByScrollTaskState.throttleWaitTime. Leaving it for now, since it seems
// like a bug?
onScrollResponse(new TimeValue(System.nanoTime()), this.lastBatchSize, asyncResponse);
}

/**
* Process a scroll response.
* @param lastBatchStartTime the time when the last batch started. Used to calculate the throttling delay.
* @param lastBatchSize the size of the last batch. Used to calculate the throttling delay.
* @param response the scroll response to process
* @param asyncResponse the response to process from ScrollableHitSource
*/
void onScrollResponse(TimeValue lastBatchStartTime, int lastBatchSize, ScrollableHitSource.Response response) {
void onScrollResponse(TimeValue lastBatchStartTime, int lastBatchSize, ScrollableHitSource.AsyncResponse asyncResponse) {
ScrollableHitSource.Response response = asyncResponse.response();
logger.debug("[{}]: got scroll response with [{}] hits", task.getId(), response.getHits().size());
if (task.isCancelled()) {
logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
Expand All @@ -274,7 +283,7 @@ protected void doRun() throws Exception {
* It is important that the batch start time be calculated from here, scroll response to scroll response. That way the time
* waiting on the scroll doesn't count against this batch in the throttle.
*/
prepareBulkRequest(timeValueNanos(System.nanoTime()), response);
prepareBulkRequest(timeValueNanos(System.nanoTime()), asyncResponse);
}

@Override
Expand All @@ -291,7 +300,8 @@ public void onFailure(Exception e) {
* delay has been slept. Uses the generic thread pool because reindex is rare enough not to need its own thread pool and because the
* thread may be blocked by the user script.
*/
void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.Response response) {
void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.AsyncResponse asyncResponse) {
ScrollableHitSource.Response response = asyncResponse.response();
logger.debug("[{}]: preparing bulk request", task.getId());
if (task.isCancelled()) {
logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
Expand All @@ -316,18 +326,18 @@ void prepareBulkRequest(TimeValue thisBatchStartTime, ScrollableHitSource.Respon
/*
* If we noop-ed the entire batch then just skip to the next batch or the BulkRequest would fail validation.
*/
startNextScroll(thisBatchStartTime, timeValueNanos(System.nanoTime()), 0);
notifyDone(thisBatchStartTime, asyncResponse, 0);
return;
}
request.timeout(mainRequest.getTimeout());
request.waitForActiveShards(mainRequest.getWaitForActiveShards());
sendBulkRequest(thisBatchStartTime, request);
sendBulkRequest(request, () -> notifyDone(thisBatchStartTime, asyncResponse, request.requests().size()));
}

/**
* Send a bulk request, handling retries.
*/
void sendBulkRequest(TimeValue thisBatchStartTime, BulkRequest request) {
void sendBulkRequest(BulkRequest request, Runnable onSuccess) {
if (logger.isDebugEnabled()) {
logger.debug("[{}]: sending [{}] entry, [{}] bulk request", task.getId(), request.requests().size(),
new ByteSizeValue(request.estimatedSizeInBytes()));
Expand All @@ -340,7 +350,7 @@ void sendBulkRequest(TimeValue thisBatchStartTime, BulkRequest request) {
bulkRetry.withBackoff(client::bulk, request, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
onBulkResponse(thisBatchStartTime, response);
onBulkResponse(response, onSuccess);
}

@Override
Expand All @@ -353,7 +363,7 @@ public void onFailure(Exception e) {
/**
* Processes bulk responses, accounting for failures.
*/
void onBulkResponse(TimeValue thisBatchStartTime, BulkResponse response) {
void onBulkResponse(BulkResponse response, Runnable onSuccess) {
try {
List<Failure> failures = new ArrayList<>();
Set<String> destinationIndicesThisBatch = new HashSet<>();
Expand Down Expand Up @@ -401,28 +411,20 @@ void onBulkResponse(TimeValue thisBatchStartTime, BulkResponse response) {
return;
}

startNextScroll(thisBatchStartTime, timeValueNanos(System.nanoTime()), response.getItems().length);
onSuccess.run();
} catch (Exception t) {
finishHim(t);
}
}

/**
* Start the next scroll request.
*
* @param lastBatchSize the number of requests sent in the last batch. This is used to calculate the throttling values which are applied
* when the scroll returns
*/
void startNextScroll(TimeValue lastBatchStartTime, TimeValue now, int lastBatchSize) {
void notifyDone(TimeValue thisBatchStartTime, ScrollableHitSource.AsyncResponse asyncResponse, int batchSize) {
if (task.isCancelled()) {
logger.debug("[{}]: finishing early because the task was cancelled", task.getId());
finishHim(null);
return;
}
TimeValue extraKeepAlive = worker.throttleWaitTime(lastBatchStartTime, now, lastBatchSize);
scrollSource.startNextScroll(extraKeepAlive, response -> {
onScrollResponse(lastBatchStartTime, lastBatchSize, response);
});
this.lastBatchSize = batchSize;
asyncResponse.done(worker.throttleWaitTime(thisBatchStartTime, timeValueNanos(System.nanoTime()), batchSize));
}

private void recordFailure(Failure failure, List<Failure> failures) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,8 @@ protected ScrollableHitSource buildScrollableResultSource(BackoffPolicy backoffP
RemoteInfo remoteInfo = mainRequest.getRemoteInfo();
createdThreads = synchronizedList(new ArrayList<>());
RestClient restClient = buildRestClient(remoteInfo, mainAction.sslConfig, task.getId(), createdThreads);
return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry, this::finishHim,
return new RemoteScrollableHitSource(logger, backoffPolicy, threadPool, worker::countSearchRetry,
this::onScrollResponse, this::finishHim,
restClient, remoteInfo.getQuery(), mainRequest.getSearchRequest());
}
return super.buildScrollableResultSource(backoffPolicy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.Version;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.index.reindex.ScrollableHitSource;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.ResponseException;
Expand All @@ -44,9 +43,10 @@
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentParseException;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.reindex.ScrollableHitSource;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;

Expand All @@ -68,8 +68,9 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
Version remoteVersion;

public RemoteScrollableHitSource(Logger logger, BackoffPolicy backoffPolicy, ThreadPool threadPool, Runnable countSearchRetry,
Consumer<Exception> fail, RestClient client, BytesReference query, SearchRequest searchRequest) {
super(logger, backoffPolicy, threadPool, countSearchRetry, fail);
Consumer<AsyncResponse> onResponse, Consumer<Exception> fail,
RestClient client, BytesReference query, SearchRequest searchRequest) {
super(logger, backoffPolicy, threadPool, countSearchRetry, onResponse, fail);
this.query = query;
this.searchRequest = searchRequest;
this.client = client;
Expand Down
Loading