Skip to content

Commit

Permalink
[8.x] Adding get migration reindex status (elastic#118267) (elastic#1…
Browse files Browse the repository at this point in the history
…18361)

* Adding get migration reindex status (elastic#118267)

This adds a new transport action to get the status of a migration
reindex (started via the API at elastic#118109), and a new rest action to use
it. The rest action accepts the data stream or index name, and returns
the status. For example if the reindex task exists for data stream
`my-data-stream`:

```
GET /_migration/reindex/my-data-stream/_status?pretty
```

returns

```
{
  "start_time" : 1733519098570,
  "complete" : true,
  "total_indices" : 1,
  "total_indices_requiring_upgrade" : 0,
  "successes" : 0,
  "in_progress" : 0,
  "pending" : 0,
  "errors" : [ ]
}
```

If a reindex task does not exist:

```
GET _migration/reindex/my-data-stream/_status?pretty
```

Then a 404 is returned:

```
{
  "error" : {
    "root_cause" : [
      {
        "type" : "resource_not_found_exception",
        "reason" : "No migration reindex status found for [my-data-stream]"
      }
    ],
    "type" : "resource_not_found_exception",
    "reason" : "No migration reindex status found for [my-data-stream]"
  },
  "status" : 404
}
```

* adding migration reindex actions to OperatorPrivilegesIT
  • Loading branch information
masseyke authored Dec 10, 2024
1 parent 18974f5 commit 04d01fe
Show file tree
Hide file tree
Showing 13 changed files with 595 additions and 9 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/118267.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 118267
summary: Adding get migration reindex status
area: Data streams
type: enhancement
issues: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"migrate.get_reindex_status":{
"documentation":{
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-stream-reindex.html",
"description":"This API returns the status of a migration reindex attempt for a data stream or index"
},
"stability":"experimental",
"visibility":"private",
"headers":{
"accept": [ "application/json"],
"content_type": ["application/json"]
},
"url":{
"paths":[
{
"path":"/_migration/reindex/{index}/_status",
"methods":[
"GET"
],
"parts":{
"index":{
"type":"string",
"description":"The index or data stream name"
}
}
}
]
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.xpack.migrate.MigratePlugin;
import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.ReindexDataStreamRequest;
import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.ReindexDataStreamResponse;
import org.elasticsearch.xpack.migrate.task.ReindexDataStreamStatus;
import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTask;

import java.util.Collection;
Expand Down Expand Up @@ -68,7 +69,7 @@ public void testAlreadyUpToDateDataStream() throws Exception {
ReindexDataStreamAction.Mode.UPGRADE,
dataStreamName
);
createDataStream(dataStreamName);
final int backingIndexCount = createDataStream(dataStreamName);
ReindexDataStreamResponse response = client().execute(
new ActionType<ReindexDataStreamResponse>(ReindexDataStreamAction.NAME),
reindexDataStreamRequest
Expand All @@ -78,7 +79,6 @@ public void testAlreadyUpToDateDataStream() throws Exception {
AtomicReference<ReindexDataStreamTask> runningTask = new AtomicReference<>();
for (TransportService transportService : internalCluster().getInstances(TransportService.class)) {
TaskManager taskManager = transportService.getTaskManager();
Map<Long, CancellableTask> tasksMap = taskManager.getCancellableTasks();
Optional<Map.Entry<Long, CancellableTask>> optionalTask = taskManager.getCancellableTasks()
.entrySet()
.stream()
Expand All @@ -99,9 +99,24 @@ public void testAlreadyUpToDateDataStream() throws Exception {
assertThat(task.getStatus().pending(), equalTo(0));
assertThat(task.getStatus().inProgress(), equalTo(0));
assertThat(task.getStatus().errors().size(), equalTo(0));

assertBusy(() -> {
GetMigrationReindexStatusAction.Response statusResponse = client().execute(
new ActionType<GetMigrationReindexStatusAction.Response>(GetMigrationReindexStatusAction.NAME),
new GetMigrationReindexStatusAction.Request(dataStreamName)
).actionGet();
ReindexDataStreamStatus status = (ReindexDataStreamStatus) statusResponse.getTask().getTask().status();
assertThat(status.complete(), equalTo(true));
assertThat(status.errors(), equalTo(List.of()));
assertThat(status.exception(), equalTo(null));
assertThat(status.pending(), equalTo(0));
assertThat(status.inProgress(), equalTo(0));
assertThat(status.totalIndices(), equalTo(backingIndexCount));
assertThat(status.totalIndicesToBeUpgraded(), equalTo(0));
});
}

private void createDataStream(String dataStreamName) {
private int createDataStream(String dataStreamName) {
final TransportPutComposableIndexTemplateAction.Request putComposableTemplateRequest =
new TransportPutComposableIndexTemplateAction.Request("my-template");
putComposableTemplateRequest.indexTemplate(
Expand All @@ -125,10 +140,13 @@ private void createDataStream(String dataStreamName) {
client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest)
);
assertThat(createDataStreamResponse.isAcknowledged(), is(true));
indexDocs(dataStreamName);
safeGet(new RolloverRequestBuilder(client()).setRolloverTarget(dataStreamName).lazy(false).execute());
indexDocs(dataStreamName);
safeGet(new RolloverRequestBuilder(client()).setRolloverTarget(dataStreamName).lazy(false).execute());
int backingIndices = 1;
for (int i = 0; i < randomIntBetween(2, 5); i++) {
indexDocs(dataStreamName);
safeGet(new RolloverRequestBuilder(client()).setRolloverTarget(dataStreamName).lazy(false).execute());
backingIndices++;
}
return backingIndices;
}

private void indexDocs(String dataStreamName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusAction;
import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusTransportAction;
import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction;
import org.elasticsearch.xpack.migrate.action.ReindexDataStreamTransportAction;
import org.elasticsearch.xpack.migrate.rest.RestGetMigrationReindexStatusAction;
import org.elasticsearch.xpack.migrate.rest.RestMigrationReindexAction;
import org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskExecutor;
import org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskState;
Expand Down Expand Up @@ -65,6 +68,7 @@ public List<RestHandler> getRestHandlers(
List<RestHandler> handlers = new ArrayList<>();
if (REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()) {
handlers.add(new RestMigrationReindexAction());
handlers.add(new RestGetMigrationReindexStatusAction());
}
return handlers;
}
Expand All @@ -74,6 +78,7 @@ public List<RestHandler> getRestHandlers(
List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> actions = new ArrayList<>();
if (REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()) {
actions.add(new ActionHandler<>(ReindexDataStreamAction.INSTANCE, ReindexDataStreamTransportAction.class));
actions.add(new ActionHandler<>(GetMigrationReindexStatusAction.INSTANCE, GetMigrationReindexStatusTransportAction.class));
}
return actions;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.migrate.action;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskResult;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;

import static java.util.Objects.requireNonNull;

public class GetMigrationReindexStatusAction extends ActionType<GetMigrationReindexStatusAction.Response> {

public static final GetMigrationReindexStatusAction INSTANCE = new GetMigrationReindexStatusAction();
public static final String NAME = "indices:admin/migration/reindex_status";

public GetMigrationReindexStatusAction() {
super(NAME);
}

public static class Response extends ActionResponse implements ToXContentObject {
private final TaskResult task;

public Response(TaskResult task) {
this.task = requireNonNull(task, "task is required");
}

public Response(StreamInput in) throws IOException {
super(in);
task = in.readOptionalWriteable(TaskResult::new);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalWriteable(task);
}

/**
* Get the actual result of the fetch.
*/
public TaskResult getTask() {
return task;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
Task.Status status = task.getTask().status();
if (status != null) {
task.getTask().status().toXContent(builder, params);
}
return builder;
}

@Override
public int hashCode() {
return Objects.hashCode(task);
}

@Override
public boolean equals(Object other) {
return other instanceof Response && task.equals(((Response) other).task);
}

@Override
public String toString() {
String toString = Strings.toString(this);
return toString.isEmpty() ? "unavailable" : toString;
}

}

public static class Request extends ActionRequest implements IndicesRequest {
private final String index;

public Request(String index) {
super();
this.index = index;
}

public Request(StreamInput in) throws IOException {
super(in);
this.index = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(index);
}

@Override
public ActionRequestValidationException validate() {
return null;
}

public String getIndex() {
return index;
}

@Override
public int hashCode() {
return Objects.hashCode(index);
}

@Override
public boolean equals(Object other) {
return other instanceof Request && index.equals(((Request) other).index);
}

public Request nodeRequest(String thisNodeId, long thisTaskId) {
Request copy = new Request(index);
copy.setParentTask(thisNodeId, thisTaskId);
return copy;
}

@Override
public String[] indices() {
return new String[] { index };
}

@Override
public IndicesOptions indicesOptions() {
return IndicesOptions.strictSingleIndexNoExpandForbidClosed();
}
}
}
Loading

0 comments on commit 04d01fe

Please sign in to comment.