Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reindex rethrottle persistent task #51599

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public class ReindexPlugin extends Plugin implements ActionPlugin, PersistentTas
new ActionHandler<>(UpdateByQueryAction.INSTANCE, TransportUpdateByQueryAction.class),
new ActionHandler<>(DeleteByQueryAction.INSTANCE, TransportDeleteByQueryAction.class),
new ActionHandler<>(RethrottleAction.INSTANCE, TransportRethrottleAction.class),
new ActionHandler<>(StartReindexTaskAction.INSTANCE, TransportStartReindexTaskAction.class)
new ActionHandler<>(StartReindexTaskAction.INSTANCE, TransportStartReindexTaskAction.class),
new ActionHandler<>(RethrottlePersistentReindexAction.INSTANCE, TransportRethrottlePersistentReindexAction.class)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,9 +140,8 @@ BulkByScrollTask getChildTask() {

private void execute(ReindexTaskParams reindexTaskParams) {
long allocationId = getAllocationId();

ReindexTaskStateUpdater taskUpdater = new ReindexTaskStateUpdater(reindexIndexClient, client.threadPool(), getPersistentTaskId(),
allocationId, new ActionListener<>() {
allocationId, taskId, new ActionListener<>() {
@Override
public void onResponse(ReindexTaskStateDoc stateDoc) {
reindexDone(stateDoc, reindexTaskParams.shouldStoreResult());
Expand All @@ -158,14 +157,14 @@ public void onFailure(Exception e) {
taskUpdater.assign(new ActionListener<>() {
@Override
public void onResponse(ReindexTaskStateDoc stateDoc) {
ReindexRequest reindexRequest = stateDoc.getReindexRequest();
ReindexRequest reindexRequest = stateDoc.getRethrottledReindexRequest();
description = reindexRequest.getDescription();
reindexer.initTask(childTask, reindexRequest, new ActionListener<>() {
@Override
public void onResponse(Void aVoid) {
// TODO: need to store status in state so we can continue from it.
transientStatus = childTask.getStatus();
performReindex(reindexTaskParams, stateDoc, taskUpdater);
performReindex(reindexRequest, reindexTaskParams, stateDoc, taskUpdater);
}

@Override
Expand Down Expand Up @@ -239,8 +238,8 @@ public void onFailure(Exception e) {
});
}

private void performReindex(ReindexTaskParams reindexTaskParams, ReindexTaskStateDoc stateDoc, ReindexTaskStateUpdater taskUpdater) {
ReindexRequest reindexRequest = stateDoc.getReindexRequest();
private void performReindex(ReindexRequest reindexRequest, ReindexTaskParams reindexTaskParams, ReindexTaskStateDoc stateDoc,
ReindexTaskStateUpdater taskUpdater) {
ScrollableHitSource.Checkpoint initialCheckpoint = stateDoc.getCheckpoint();
ThreadContext threadContext = client.threadPool().getThreadContext();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;

Expand All @@ -35,63 +36,78 @@ public class ReindexTaskStateDoc implements ToXContentObject {

public static final ConstructingObjectParser<ReindexTaskStateDoc, Void> PARSER =
new ConstructingObjectParser<>("reindex/index_state", a -> new ReindexTaskStateDoc((ReindexRequest) a[0], (Boolean) a[1],
(Long) a[2], (BulkByScrollResponse) a[3], (ElasticsearchException) a[4], (Integer) a[5],
(ScrollableHitSource.Checkpoint) a[6]));
(Long) a[2], toTaskId((String) a[3]), (BulkByScrollResponse) a[4], (ElasticsearchException) a[5], (Integer) a[6],
(ScrollableHitSource.Checkpoint) a[7], (float) a[8]));

private static final String REINDEX_REQUEST = "request";
private static final String RESILIENT = "resilient";
private static final String ALLOCATION = "allocation";
private static final String EPHEMERAL_TASK_ID = "ephemeral_task_id";
private static final String REINDEX_RESPONSE = "response";
private static final String REINDEX_EXCEPTION = "exception";
private static final String FAILURE_REST_STATUS = "failure_rest_status";
private static final String REINDEX_CHECKPOINT = "checkpoint";
private static final String REQUESTS_PER_SECOND = "requests_per_second";

static {
PARSER.declareObject(ConstructingObjectParser.constructorArg(), (p, c) -> ReindexRequest.fromXContentWithParams(p),
new ParseField(REINDEX_REQUEST));
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), new ParseField(RESILIENT));
PARSER.declareLong(ConstructingObjectParser.optionalConstructorArg(), new ParseField(ALLOCATION));
PARSER.declareString(ConstructingObjectParser.optionalConstructorArg(), new ParseField(EPHEMERAL_TASK_ID));
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> BulkByScrollResponse.fromXContent(p),
new ParseField(REINDEX_RESPONSE));
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> ElasticsearchException.fromXContent(p),
new ParseField(REINDEX_EXCEPTION));
PARSER.declareInt(ConstructingObjectParser.optionalConstructorArg(), new ParseField(FAILURE_REST_STATUS));
PARSER.declareObject(ConstructingObjectParser.optionalConstructorArg(), (p, c) -> ScrollableHitSource.Checkpoint.fromXContent(p),
new ParseField(REINDEX_CHECKPOINT));
PARSER.declareFloat(ConstructingObjectParser.constructorArg(), new ParseField(REQUESTS_PER_SECOND));
}

private final ReindexRequest reindexRequest;
private final boolean resilient;
private final Long allocationId;
private final TaskId ephemeralTaskId;
private final BulkByScrollResponse reindexResponse;
private final ElasticsearchException exception;
private final RestStatus failureStatusCode;
private final ScrollableHitSource.Checkpoint checkpoint;
private final float requestsPerSecond;

public ReindexTaskStateDoc(ReindexRequest reindexRequest, boolean resilient) {
this(reindexRequest, resilient, null, null, null, (RestStatus) null, null);
this(reindexRequest, resilient, null, null, null, null, (RestStatus) null, null, reindexRequest.getRequestsPerSecond());
}

private ReindexTaskStateDoc(ReindexRequest reindexRequest, boolean resilient, @Nullable Long allocationId,
@Nullable BulkByScrollResponse reindexResponse, @Nullable ElasticsearchException exception,
@Nullable Integer failureStatusCode, ScrollableHitSource.Checkpoint checkpoint) {
this(reindexRequest, resilient, allocationId, reindexResponse, exception,
failureStatusCode == null ? null : RestStatus.fromCode(failureStatusCode), checkpoint);
@Nullable TaskId ephemeralTaskId, @Nullable BulkByScrollResponse reindexResponse,
@Nullable ElasticsearchException exception,
@Nullable Integer failureStatusCode, ScrollableHitSource.Checkpoint checkpoint, float requestsPerSecond) {
this(reindexRequest, resilient, allocationId, ephemeralTaskId, reindexResponse, exception,
failureStatusCode == null ? null : RestStatus.fromCode(failureStatusCode), checkpoint, requestsPerSecond);
}

private ReindexTaskStateDoc(ReindexRequest reindexRequest, boolean resilient, @Nullable Long allocationId,
@Nullable BulkByScrollResponse reindexResponse, @Nullable ElasticsearchException exception,
@Nullable ScrollableHitSource.Checkpoint checkpoint) {
this(reindexRequest, resilient, allocationId, reindexResponse, exception, exception != null ? exception.status() : null,
checkpoint);
@Nullable TaskId ephemeralTaskId, @Nullable BulkByScrollResponse reindexResponse,
@Nullable ElasticsearchException exception,
@Nullable ScrollableHitSource.Checkpoint checkpoint, float requestsPerSecond) {
this(reindexRequest, resilient, allocationId, ephemeralTaskId,
reindexResponse, exception, exception != null ? exception.status() : null,
checkpoint, requestsPerSecond);
}

private ReindexTaskStateDoc(ReindexRequest reindexRequest, boolean resilient, @Nullable Long allocationId,
@Nullable TaskId ephemeralTaskId,
@Nullable BulkByScrollResponse reindexResponse, @Nullable ElasticsearchException exception,
@Nullable RestStatus failureStatusCode, @Nullable ScrollableHitSource.Checkpoint checkpoint) {
@Nullable RestStatus failureStatusCode, @Nullable ScrollableHitSource.Checkpoint checkpoint,
float requestsPerSecond) {
this.reindexRequest = reindexRequest;
this.resilient = resilient;
assert (allocationId == null) == (ephemeralTaskId == null);
this.allocationId = allocationId;
this.ephemeralTaskId = ephemeralTaskId;
assert Float.isNaN(requestsPerSecond) == false && requestsPerSecond >= 0;
this.requestsPerSecond = requestsPerSecond;
assert (reindexResponse == null) || (exception == null) : "Either response or exception must be null";
this.reindexResponse = reindexResponse;
this.exception = exception;
Expand All @@ -108,6 +124,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (allocationId != null) {
builder.field(ALLOCATION, allocationId);
}
if (ephemeralTaskId != null) {
builder.field(EPHEMERAL_TASK_ID, ephemeralTaskId.toString());
}
if (reindexResponse != null) {
builder.field(REINDEX_RESPONSE);
builder.startObject();
Expand All @@ -125,13 +144,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(REINDEX_CHECKPOINT);
checkpoint.toXContent(builder, params);
}
builder.field(REQUESTS_PER_SECOND, requestsPerSecond);
return builder.endObject();
}

public static ReindexTaskStateDoc fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

private static TaskId toTaskId(String s) {
return s != null ? new TaskId(s) : null;
}

public ReindexRequest getReindexRequest() {
return reindexRequest;
}
Expand All @@ -140,6 +164,14 @@ public boolean isResilient() {
return resilient;
}

public ReindexRequest getRethrottledReindexRequest() {
if (reindexRequest.getRequestsPerSecond() != requestsPerSecond) {
return new ReindexRequest(reindexRequest).setRequestsPerSecond(requestsPerSecond);
} else {
return reindexRequest;
}
}

public BulkByScrollResponse getReindexResponse() {
return reindexResponse;
}
Expand All @@ -160,18 +192,33 @@ public Long getAllocationId() {
return allocationId;
}

public TaskId getEphemeralTaskId() {
return ephemeralTaskId;
}

public float getRequestsPerSecond() {
return requestsPerSecond;
}

public ReindexTaskStateDoc withCheckpoint(ScrollableHitSource.Checkpoint checkpoint, BulkByScrollTask.Status status) {
// todo: also store and resume from status.
return new ReindexTaskStateDoc(reindexRequest, resilient, allocationId, reindexResponse, exception, failureStatusCode, checkpoint);
return new ReindexTaskStateDoc(reindexRequest, resilient, allocationId, ephemeralTaskId,
reindexResponse, exception, failureStatusCode, checkpoint, requestsPerSecond);
}

public ReindexTaskStateDoc withNewAllocation(long newAllocationId) {
return new ReindexTaskStateDoc(reindexRequest, resilient, newAllocationId, reindexResponse, exception, failureStatusCode,
checkpoint);
public ReindexTaskStateDoc withNewAllocation(long newAllocationId, TaskId ephemeralTaskId) {
return new ReindexTaskStateDoc(reindexRequest, resilient, newAllocationId, ephemeralTaskId,
reindexResponse, exception, failureStatusCode, checkpoint, requestsPerSecond);
}

public ReindexTaskStateDoc withFinishedState(@Nullable BulkByScrollResponse reindexResponse,
@Nullable ElasticsearchException exception) {
return new ReindexTaskStateDoc(reindexRequest, resilient, allocationId, reindexResponse, exception, checkpoint);
return new ReindexTaskStateDoc(reindexRequest, resilient, allocationId, ephemeralTaskId, reindexResponse, exception, checkpoint,
requestsPerSecond);
}

public ReindexTaskStateDoc withRequestsPerSecond(float requestsPerSecond) {
return new ReindexTaskStateDoc(reindexRequest, resilient, allocationId, ephemeralTaskId, reindexResponse, exception, checkpoint,
requestsPerSecond);
}
}
Loading