Skip to content

Commit

Permalink
Reindex naming changes
Browse files Browse the repository at this point in the history
Renamed types and action names to fit that we now call it a reindex task
and not a job. Removed action and named writeable todos.

Relates elastic#42612
  • Loading branch information
henningandersen committed Nov 22, 2019
1 parent 476eeb7 commit 281549f
Show file tree
Hide file tree
Showing 15 changed files with 91 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,12 @@

import java.io.IOException;

public class ReindexJobState implements Task.Status, PersistentTaskState {
public class ReindexPersistentTaskState implements Task.Status, PersistentTaskState {

// TODO: Name
public static final String NAME = ReindexTask.NAME;
public static final String NAME = "reindex_persistent_task_state";

public static final ConstructingObjectParser<ReindexJobState, Void> PARSER =
new ConstructingObjectParser<>(NAME, a -> new ReindexJobState((String) a[0], (String) a[1]));
public static final ConstructingObjectParser<ReindexPersistentTaskState, Void> PARSER =
new ConstructingObjectParser<>(NAME, a -> new ReindexPersistentTaskState((String) a[0], (String) a[1]));

private static String EPHEMERAL_TASK_ID = "ephemeral_task_id";
private static String STATUS = "status";
Expand All @@ -50,17 +49,17 @@ public class ReindexJobState implements Task.Status, PersistentTaskState {
private final TaskId ephemeralTaskId;
private final Status status;

private ReindexJobState(String ephemeralTaskId, String status) {
private ReindexPersistentTaskState(String ephemeralTaskId, String status) {
this(new TaskId(ephemeralTaskId), Status.valueOf(status));
}

ReindexJobState(TaskId ephemeralTaskId, Status status) {
ReindexPersistentTaskState(TaskId ephemeralTaskId, Status status) {
assert status != null : "Status cannot be null";
this.ephemeralTaskId = ephemeralTaskId;
this.status = status;
}

public ReindexJobState(StreamInput in) throws IOException {
public ReindexPersistentTaskState(StreamInput in) throws IOException {
ephemeralTaskId = TaskId.readFromStream(in);
status = in.readEnum(Status.class);
}
Expand Down Expand Up @@ -96,7 +95,7 @@ public TaskId getEphemeralTaskId() {
return ephemeralTaskId;
}

public static ReindexJobState fromXContent(XContentParser parser) {
public static ReindexPersistentTaskState fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,26 +71,26 @@ public class ReindexPlugin extends Plugin implements ActionPlugin, PersistentTas
new ActionHandler<>(UpdateByQueryAction.INSTANCE, TransportUpdateByQueryAction.class),
new ActionHandler<>(DeleteByQueryAction.INSTANCE, TransportDeleteByQueryAction.class),
new ActionHandler<>(RethrottleAction.INSTANCE, TransportRethrottleAction.class),
new ActionHandler<>(StartReindexJobAction.INSTANCE, TransportStartReindexJobAction.class)
new ActionHandler<>(StartReindexTaskAction.INSTANCE, TransportStartReindexTaskAction.class)
);
}

@Override
public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
return Arrays.asList(
new NamedWriteableRegistry.Entry(Task.Status.class, BulkByScrollTask.Status.NAME, BulkByScrollTask.Status::new),
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, ReindexJob.NAME, ReindexJob::new),
new NamedWriteableRegistry.Entry(Task.Status.class, ReindexJobState.NAME, ReindexJobState::new),
new NamedWriteableRegistry.Entry(PersistentTaskState.class, ReindexJobState.NAME, ReindexJobState::new));
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, ReindexTaskParams.NAME, ReindexTaskParams::new),
new NamedWriteableRegistry.Entry(Task.Status.class, ReindexPersistentTaskState.NAME, ReindexPersistentTaskState::new),
new NamedWriteableRegistry.Entry(PersistentTaskState.class, ReindexPersistentTaskState.NAME, ReindexPersistentTaskState::new));
}

@Override
public List<NamedXContentRegistry.Entry> getNamedXContent() {
return Arrays.asList(
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(ReindexJob.NAME), ReindexJob::fromXContent),
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(ReindexJobState.NAME), ReindexJobState::fromXContent),
new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(ReindexJobState.NAME),
ReindexJobState::fromXContent));
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(ReindexTaskParams.NAME), ReindexTaskParams::fromXContent),
new NamedXContentRegistry.Entry(Task.Status.class, new ParseField(ReindexPersistentTaskState.NAME), ReindexPersistentTaskState::fromXContent),
new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(ReindexPersistentTaskState.NAME),
ReindexPersistentTaskState::fromXContent));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ public class ReindexTask extends AllocatedPersistentTask {

private static final Logger logger = LogManager.getLogger(ReindexTask.class);

// TODO: Name
public static final String NAME = "reindex/job";
public static final String NAME = "reindex";

private final NodeClient client;
private final ReindexIndexClient reindexIndexClient;
Expand All @@ -59,7 +58,7 @@ public class ReindexTask extends AllocatedPersistentTask {
private volatile String description;
private volatile boolean assignmentConflictDetected;

public static class ReindexPersistentTasksExecutor extends PersistentTasksExecutor<ReindexJob> {
public static class ReindexPersistentTasksExecutor extends PersistentTasksExecutor<ReindexTaskParams> {

private final ClusterService clusterService;
private final Client client;
Expand All @@ -80,14 +79,14 @@ public static class ReindexPersistentTasksExecutor extends PersistentTasksExecut
}

@Override
protected void nodeOperation(AllocatedPersistentTask task, ReindexJob reindexJob, PersistentTaskState state) {
protected void nodeOperation(AllocatedPersistentTask task, ReindexTaskParams reindexTaskParams, PersistentTaskState state) {
ReindexTask reindexTask = (ReindexTask) task;
reindexTask.execute(reindexJob);
reindexTask.execute(reindexTaskParams);
}

@Override
protected AllocatedPersistentTask createTask(long id, String type, String action, TaskId parentTaskId,
PersistentTasksCustomMetaData.PersistentTask<ReindexJob> taskInProgress,
PersistentTasksCustomMetaData.PersistentTask<ReindexTaskParams> taskInProgress,
Map<String, String> headers) {
headers.putAll(taskInProgress.getParams().getHeaders());
Reindexer reindexer = new Reindexer(clusterService, client, threadPool, scriptService, reindexSslConfig);
Expand Down Expand Up @@ -139,20 +138,20 @@ BulkByScrollTask getChildTask() {
return childTask;
}

private void execute(ReindexJob reindexJob) {
private void execute(ReindexTaskParams reindexTaskParams) {
long allocationId = getAllocationId();

ReindexTaskStateUpdater taskUpdater = new ReindexTaskStateUpdater(reindexIndexClient, client.threadPool(), getPersistentTaskId(),
allocationId, new ActionListener<>() {
@Override
public void onResponse(ReindexTaskStateDoc stateDoc) {
reindexDone(stateDoc, reindexJob.shouldStoreResult());
reindexDone(stateDoc, reindexTaskParams.shouldStoreResult());
}

@Override
public void onFailure(Exception e) {
logger.info("Reindex task failed", e);
updateClusterStateToFailed(reindexJob.shouldStoreResult(), ReindexJobState.Status.DONE, e);
updateClusterStateToFailed(reindexTaskParams.shouldStoreResult(), ReindexPersistentTaskState.Status.DONE, e);
}
}, this::handleCheckpointAssignmentConflict);

Expand All @@ -166,7 +165,7 @@ public void onResponse(ReindexTaskStateDoc stateDoc) {
public void onResponse(Void aVoid) {
// TODO: need to store status in state so we can continue from it.
transientStatus = childTask.getStatus();
performReindex(reindexJob, stateDoc, taskUpdater);
performReindex(reindexTaskParams, stateDoc, taskUpdater);
}

@Override
Expand All @@ -178,7 +177,7 @@ public void onFailure(Exception e) {

@Override
public void onFailure(Exception ex) {
updateClusterStateToFailed(reindexJob.shouldStoreResult(), ReindexJobState.Status.ASSIGNMENT_FAILED, ex);
updateClusterStateToFailed(reindexTaskParams.shouldStoreResult(), ReindexPersistentTaskState.Status.ASSIGNMENT_FAILED, ex);
}
});
}
Expand All @@ -196,7 +195,7 @@ protected void onCancelled() {

private void reindexDone(ReindexTaskStateDoc stateDoc, boolean shouldStoreResult) {
TaskManager taskManager = getTaskManager();
updatePersistentTaskState(new ReindexJobState(taskId, ReindexJobState.Status.DONE), new ActionListener<>() {
updatePersistentTaskState(new ReindexPersistentTaskState(taskId, ReindexPersistentTaskState.Status.DONE), new ActionListener<>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
if (shouldStoreResult) {
Expand Down Expand Up @@ -226,7 +225,7 @@ public void onFailure(Exception ex) {
}

private void sendStartedNotification(boolean shouldStoreResult) {
updatePersistentTaskState(new ReindexJobState(taskId, ReindexJobState.Status.STARTED), new ActionListener<>() {
updatePersistentTaskState(new ReindexPersistentTaskState(taskId, ReindexPersistentTaskState.Status.STARTED), new ActionListener<>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
}
Expand All @@ -239,14 +238,14 @@ public void onFailure(Exception e) {
});
}

private void performReindex(ReindexJob reindexJob, ReindexTaskStateDoc stateDoc, ReindexTaskStateUpdater taskUpdater) {
private void performReindex(ReindexTaskParams reindexTaskParams, ReindexTaskStateDoc stateDoc, ReindexTaskStateUpdater taskUpdater) {
ReindexRequest reindexRequest = stateDoc.getReindexRequest();
ScrollableHitSource.Checkpoint initialCheckpoint = stateDoc.getCheckpoint();
ThreadContext threadContext = client.threadPool().getThreadContext();

Supplier<ThreadContext.StoredContext> context = threadContext.newRestorableContext(false);
// TODO: Eventually we only want to retain security context
try (ThreadContext.StoredContext ignore = stashWithHeaders(threadContext, reindexJob.getHeaders())) {
try (ThreadContext.StoredContext ignore = stashWithHeaders(threadContext, reindexTaskParams.getHeaders())) {
reindexer.execute(childTask, reindexRequest, new ContextPreservingActionListener<>(context, new ActionListener<>() {
@Override
public void onResponse(BulkByScrollResponse response) {
Expand All @@ -264,11 +263,11 @@ public void onFailure(Exception e) {
}, true);
}
// send this after we started reindex to ensure sub-tasks are created.
sendStartedNotification(reindexJob.shouldStoreResult());
sendStartedNotification(reindexTaskParams.shouldStoreResult());
}

private void updateClusterStateToFailed(boolean shouldStoreResult, ReindexJobState.Status status, Exception ex) {
updatePersistentTaskState(new ReindexJobState(taskId, status), new ActionListener<>() {
private void updateClusterStateToFailed(boolean shouldStoreResult, ReindexPersistentTaskState.Status status, Exception ex) {
updatePersistentTaskState(new ReindexPersistentTaskState(taskId, status), new ActionListener<>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
markEphemeralTaskFailed(shouldStoreResult, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,13 @@
import java.io.IOException;
import java.util.Map;

public class ReindexJob implements PersistentTaskParams {
public class ReindexTaskParams implements PersistentTaskParams {

// TODO: Name
public static final String NAME = ReindexTask.NAME;
public static final String NAME = "reindex_task_params";

@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<ReindexJob, Void> PARSER
= new ConstructingObjectParser<>(NAME, a -> new ReindexJob((Boolean) a[0], (Map<String, String>) a[1]));
public static final ConstructingObjectParser<ReindexTaskParams, Void> PARSER
= new ConstructingObjectParser<>(NAME, a -> new ReindexTaskParams((Boolean) a[0], (Map<String, String>) a[1]));

private static String STORE_RESULT = "store_result";
private static String HEADERS = "headers";
Expand All @@ -51,12 +50,12 @@ public class ReindexJob implements PersistentTaskParams {
private final boolean storeResult;
private final Map<String, String> headers;

public ReindexJob(boolean storeResult, Map<String, String> headers) {
public ReindexTaskParams(boolean storeResult, Map<String, String> headers) {
this.storeResult = storeResult;
this.headers = headers;
}

public ReindexJob(StreamInput in) throws IOException {
public ReindexTaskParams(StreamInput in) throws IOException {
storeResult = in.readBoolean();
headers = in.readMap(StreamInput::readString, StreamInput::readString);
}
Expand Down Expand Up @@ -94,7 +93,7 @@ public Map<String, String> getHeaders() {
return headers;
}

public static ReindexJob fromXContent(XContentParser parser) {
public static ReindexTaskParams fromXContent(XContentParser parser) {
return PARSER.apply(parser, null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client
boolean waitForCompletion = request.paramAsBoolean("wait_for_completion", true);

// Build the internal request
StartReindexJobAction.Request internal = new StartReindexJobAction.Request(setCommonOptions(request, buildRequest(request)),
StartReindexTaskAction.Request internal = new StartReindexTaskAction.Request(setCommonOptions(request, buildRequest(request)),
waitForCompletion);
/*
* Let's try and validate before forking so the user gets some error. The
Expand All @@ -81,12 +81,12 @@ public RestChannelConsumer prepareRequest(RestRequest request, NodeClient client
params.put(BulkByScrollTask.Status.INCLUDE_CREATED, Boolean.toString(true));
params.put(BulkByScrollTask.Status.INCLUDE_UPDATED, Boolean.toString(true));

return channel -> client.execute(StartReindexJobAction.INSTANCE, internal, new ActionListener<>() {
return channel -> client.execute(StartReindexTaskAction.INSTANCE, internal, new ActionListener<>() {

private BulkIndexByScrollResponseContentListener listener = new BulkIndexByScrollResponseContentListener(channel, params);

@Override
public void onResponse(StartReindexJobAction.Response response) {
public void onResponse(StartReindexTaskAction.Response response) {
listener.onResponse(response.getReindexResponse());
}

Expand All @@ -96,9 +96,9 @@ public void onFailure(Exception e) {
}
});
} else {
return channel -> client.execute(StartReindexJobAction.INSTANCE, internal, new RestBuilderListener<>(channel) {
return channel -> client.execute(StartReindexTaskAction.INSTANCE, internal, new RestBuilderListener<>(channel) {
@Override
public RestResponse buildResponse(StartReindexJobAction.Response response, XContentBuilder builder) throws Exception {
public RestResponse buildResponse(StartReindexTaskAction.Response response, XContentBuilder builder) throws Exception {
builder.startObject();
// This is the ephemeral task-id from the first node that is assigned the task (for BWC).
builder.field("task", response.getTaskId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,12 @@

import java.io.IOException;

public class StartReindexJobAction extends ActionType<StartReindexJobAction.Response> {
public class StartReindexTaskAction extends ActionType<StartReindexTaskAction.Response> {

public static final StartReindexJobAction INSTANCE = new StartReindexJobAction();
// TODO: Name
public static final String NAME = "indices:data/write/start_reindex";
public static final StartReindexTaskAction INSTANCE = new StartReindexTaskAction();
public static final String NAME = "indices:data/write/reindex/start";

private StartReindexJobAction() {
private StartReindexTaskAction() {
super(NAME, Response::new);
}

Expand Down
Loading

0 comments on commit 281549f

Please sign in to comment.