Skip to content

Commit

Permalink
Reindex rethrottle persistent task (#51599)
Browse files Browse the repository at this point in the history
* Reindex rethrottle persistent task

This adds support for rethrottling resilient/persistent reindex
through updating the .reindex index and notifying the task. This
ensures that the new throttle value sticks on failovers while
also ensuring that the task wakes up immediately if it had a very
low throttle value.

Related to #42612
  • Loading branch information
henningandersen authored Feb 13, 2020
1 parent f133e16 commit 5c66735
Show file tree
Hide file tree
Showing 20 changed files with 442 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public class ReindexPlugin extends Plugin implements ActionPlugin, PersistentTas
new ActionHandler<>(UpdateByQueryAction.INSTANCE, TransportUpdateByQueryAction.class),
new ActionHandler<>(DeleteByQueryAction.INSTANCE, TransportDeleteByQueryAction.class),
new ActionHandler<>(RethrottleAction.INSTANCE, TransportRethrottleAction.class),
new ActionHandler<>(StartReindexTaskAction.INSTANCE, TransportStartReindexTaskAction.class)
new ActionHandler<>(StartReindexTaskAction.INSTANCE, TransportStartReindexTaskAction.class),
new ActionHandler<>(RethrottlePersistentReindexAction.INSTANCE, TransportRethrottlePersistentReindexAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,8 @@ BulkByScrollTask getChildTask() {

private void execute(ReindexTaskParams reindexTaskParams) {
long allocationId = getAllocationId();

ReindexTaskStateUpdater taskUpdater = new ReindexTaskStateUpdater(reindexIndexClient, client.threadPool(), getPersistentTaskId(),
allocationId, new ActionListener<>() {
allocationId, taskId, new ActionListener<>() {
@Override
public void onResponse(ReindexTaskStateDoc stateDoc) {
reindexDone(stateDoc, reindexTaskParams.shouldStoreResult());
Expand All @@ -158,14 +157,14 @@ public void onFailure(Exception e) {
taskUpdater.assign(new ActionListener<>() {
@Override
public void onResponse(ReindexTaskStateDoc stateDoc) {
ReindexRequest reindexRequest = stateDoc.getReindexRequest();
ReindexRequest reindexRequest = stateDoc.getRethrottledReindexRequest();
description = reindexRequest.getDescription();
reindexer.initTask(childTask, reindexRequest, new ActionListener<>() {
@Override
public void onResponse(Void aVoid) {
// TODO: need to store status in state so we can continue from it.
transientStatus = childTask.getStatus();
performReindex(reindexTaskParams, stateDoc, taskUpdater);
performReindex(reindexRequest, reindexTaskParams, stateDoc, taskUpdater);
}

@Override
Expand Down Expand Up @@ -239,8 +238,8 @@ public void onFailure(Exception e) {
});
}

private void performReindex(ReindexTaskParams reindexTaskParams, ReindexTaskStateDoc stateDoc, ReindexTaskStateUpdater taskUpdater) {
ReindexRequest reindexRequest = stateDoc.getReindexRequest();
private void performReindex(ReindexRequest reindexRequest, ReindexTaskParams reindexTaskParams, ReindexTaskStateDoc stateDoc,
ReindexTaskStateUpdater taskUpdater) {
ScrollableHitSource.Checkpoint initialCheckpoint = stateDoc.getCheckpoint();
ThreadContext threadContext = client.threadPool().getThreadContext();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;

Expand All @@ -35,63 +36,78 @@ public class ReindexTaskStateDoc implements ToXContentObject {

public static final ConstructingObjectParser<ReindexTaskStateDoc, Void> PARSER =
new ConstructingObjectParser<>("reindex/index_state", a -> new ReindexTaskStateDoc((ReindexRequest) a[0], (Boolean) a[1],
(Long) a[2], (BulkByScrollResponse) a[3], (ElasticsearchException) a[4], (Integer) a[5],
(ScrollableHitSource.Checkpoint) a[6]));
(Long) a[2], toTaskId((String) a[3]), (BulkByScrollResponse) a[4], (ElasticsearchException) a[5], (Integer) a[6],
(ScrollableHitSource.Checkpoint) a[7], (float) a[8]));

private static final String REINDEX_REQUEST = "request";
private static final String RESILIENT = "resilient";
private static final String ALLOCATION = "allocation";
private static final String EPHEMERAL_TASK_ID = "ephemeral_task_id";
private static final String REINDEX_RESPONSE = "response";
private static final String REINDEX_EXCEPTION = "exception";
private static final String FAILURE_REST_STATUS = "failure_rest_status";
private static final String REINDEX_CHECKPOINT = "checkpoint";
private static final String REQUESTS_PER_SECOND = "requests_per_second";

static {
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> ReindexRequest.fromXContentWithParams(p),
new ParseField(REINDEX_REQUEST));
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), new ParseField(RESILIENT));
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), new ParseField(ALLOCATION));
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), new ParseField(EPHEMERAL_TASK_ID));
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> BulkByScrollResponse.fromXContent(p),
new ParseField(REINDEX_RESPONSE));
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> ElasticsearchException.fromXContent(p),
new ParseField(REINDEX_EXCEPTION));
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), new ParseField(FAILURE_REST_STATUS));
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> ScrollableHitSource.Checkpoint.fromXContent(p),
new ParseField(REINDEX_CHECKPOINT));
PARSER.declareFloat(ConstructingObjectParser.constructorArg(), new ParseField(REQUESTS_PER_SECOND));
}

private final ReindexRequest reindexRequest;
private final boolean resilient;
private final Long allocationId;
private final TaskId ephemeralTaskId;
private final BulkByScrollResponse reindexResponse;
private final ElasticsearchException exception;
private final RestStatus failureStatusCode;
private final ScrollableHitSource.Checkpoint checkpoint;
private final float requestsPerSecond;

public ReindexTaskStateDoc(ReindexRequest reindexRequest, boolean resilient) {
this(reindexRequest, resilient, null, null, null, (RestStatus) null, null);
this(reindexRequest, resilient, null, null, null, null, (RestStatus) null, null, reindexRequest.getRequestsPerSecond());
}

private ReindexTaskStateDoc(ReindexRequest reindexRequest, boolean resilient, @Nullable Long allocationId,
@Nullable BulkByScrollResponse reindexResponse, @Nullable ElasticsearchException exception,
@Nullable Integer failureStatusCode, ScrollableHitSource.Checkpoint checkpoint) {
this(reindexRequest, resilient, allocationId, reindexResponse, exception,
failureStatusCode == null ? null : RestStatus.fromCode(failureStatusCode), checkpoint);
@Nullable TaskId ephemeralTaskId, @Nullable BulkByScrollResponse reindexResponse,
@Nullable ElasticsearchException exception,
@Nullable Integer failureStatusCode, ScrollableHitSource.Checkpoint checkpoint, float requestsPerSecond) {
this(reindexRequest, resilient, allocationId, ephemeralTaskId, reindexResponse, exception,
failureStatusCode == null ? null : RestStatus.fromCode(failureStatusCode), checkpoint, requestsPerSecond);
}

private ReindexTaskStateDoc(ReindexRequest reindexRequest, boolean resilient, @Nullable Long allocationId,
@Nullable BulkByScrollResponse reindexResponse, @Nullable ElasticsearchException exception,
@Nullable ScrollableHitSource.Checkpoint checkpoint) {
this(reindexRequest, resilient, allocationId, reindexResponse, exception, exception != null ? exception.status() : null,
checkpoint);
@Nullable TaskId ephemeralTaskId, @Nullable BulkByScrollResponse reindexResponse,
@Nullable ElasticsearchException exception,
@Nullable ScrollableHitSource.Checkpoint checkpoint, float requestsPerSecond) {
this(reindexRequest, resilient, allocationId, ephemeralTaskId,
reindexResponse, exception, exception != null ? exception.status() : null,
checkpoint, requestsPerSecond);
}

private ReindexTaskStateDoc(ReindexRequest reindexRequest, boolean resilient, @Nullable Long allocationId,
@Nullable TaskId ephemeralTaskId,
@Nullable BulkByScrollResponse reindexResponse, @Nullable ElasticsearchException exception,
@Nullable RestStatus failureStatusCode, @Nullable ScrollableHitSource.Checkpoint checkpoint) {
@Nullable RestStatus failureStatusCode, @Nullable ScrollableHitSource.Checkpoint checkpoint,
float requestsPerSecond) {
this.reindexRequest = reindexRequest;
this.resilient = resilient;
assert (allocationId == null) == (ephemeralTaskId == null);
this.allocationId = allocationId;
this.ephemeralTaskId = ephemeralTaskId;
assert Float.isNaN(requestsPerSecond) == false && requestsPerSecond >= 0;
this.requestsPerSecond = requestsPerSecond;
assert (reindexResponse == null) || (exception == null) : "Either response or exception must be null";
this.reindexResponse = reindexResponse;
this.exception = exception;
Expand All @@ -108,6 +124,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (allocationId != null) {
builder.field(ALLOCATION, allocationId);
}
if (ephemeralTaskId != null) {
builder.field(EPHEMERAL_TASK_ID, ephemeralTaskId.toString());
}
if (reindexResponse != null) {
builder.field(REINDEX_RESPONSE);
builder.startObject();
Expand All @@ -125,13 +144,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(REINDEX_CHECKPOINT);
checkpoint.toXContent(builder, params);
}
builder.field(REQUESTS_PER_SECOND, requestsPerSecond);
return builder.endObject();
}

public static ReindexTaskStateDoc fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

private static TaskId toTaskId(String s) {
return s != null ? new TaskId(s) : null;
}

public ReindexRequest getReindexRequest() {
return reindexRequest;
}
Expand All @@ -140,6 +164,14 @@ public boolean isResilient() {
return resilient;
}

public ReindexRequest getRethrottledReindexRequest() {
if (reindexRequest.getRequestsPerSecond() != requestsPerSecond) {
return new ReindexRequest(reindexRequest).setRequestsPerSecond(requestsPerSecond);
} else {
return reindexRequest;
}
}

public BulkByScrollResponse getReindexResponse() {
return reindexResponse;
}
Expand All @@ -160,18 +192,33 @@ public Long getAllocationId() {
return allocationId;
}

public TaskId getEphemeralTaskId() {
return ephemeralTaskId;
}

public float getRequestsPerSecond() {
return requestsPerSecond;
}

public ReindexTaskStateDoc withCheckpoint(ScrollableHitSource.Checkpoint checkpoint, BulkByScrollTask.Status status) {
// todo: also store and resume from status.
return new ReindexTaskStateDoc(reindexRequest, resilient, allocationId, reindexResponse, exception, failureStatusCode, checkpoint);
return new ReindexTaskStateDoc(reindexRequest, resilient, allocationId, ephemeralTaskId,
reindexResponse, exception, failureStatusCode, checkpoint, requestsPerSecond);
}

public ReindexTaskStateDoc withNewAllocation(long newAllocationId) {
return new ReindexTaskStateDoc(reindexRequest, resilient, newAllocationId, reindexResponse, exception, failureStatusCode,
checkpoint);
public ReindexTaskStateDoc withNewAllocation(long newAllocationId, TaskId ephemeralTaskId) {
return new ReindexTaskStateDoc(reindexRequest, resilient, newAllocationId, ephemeralTaskId,
reindexResponse, exception, failureStatusCode, checkpoint, requestsPerSecond);
}

public ReindexTaskStateDoc withFinishedState(@Nullable BulkByScrollResponse reindexResponse,
@Nullable ElasticsearchException exception) {
return new ReindexTaskStateDoc(reindexRequest, resilient, allocationId, reindexResponse, exception, checkpoint);
return new ReindexTaskStateDoc(reindexRequest, resilient, allocationId, ephemeralTaskId, reindexResponse, exception, checkpoint,
requestsPerSecond);
}

public ReindexTaskStateDoc withRequestsPerSecond(float requestsPerSecond) {
return new ReindexTaskStateDoc(reindexRequest, resilient, allocationId, ephemeralTaskId, reindexResponse, exception, checkpoint,
requestsPerSecond);
}
}
Loading

0 comments on commit 5c66735

Please sign in to comment.