From 04d01fee60456774e9e7b96f1581c77b3af089b9 Mon Sep 17 00:00:00 2001 From: Keith Massey Date: Tue, 10 Dec 2024 14:16:32 -0600 Subject: [PATCH] [8.x] Adding get migration reindex status (#118267) (#118361) * Adding get migration reindex status (#118267) This adds a new transport action to get the status of a migration reindex (started via the API at #118109), and a new rest action to use it. The rest action accepts the data stream or index name, and returns the status. For example if the reindex task exists for data stream `my-data-stream`: ``` GET /_migration/reindex/my-data-stream/_status?pretty ``` returns ``` { "start_time" : 1733519098570, "complete" : true, "total_indices" : 1, "total_indices_requiring_upgrade" : 0, "successes" : 0, "in_progress" : 0, "pending" : 0, "errors" : [ ] } ``` If a reindex task does not exist: ``` GET _migration/reindex/my-data-stream/_status?pretty ``` Then a 404 is returned: ``` { "error" : { "root_cause" : [ { "type" : "resource_not_found_exception", "reason" : "No migration reindex status found for [my-data-stream]" } ], "type" : "resource_not_found_exception", "reason" : "No migration reindex status found for [my-data-stream]" }, "status" : 404 } ``` * adding migration reindex actions to OperatorPrivilegesIT --- docs/changelog/118267.yaml | 5 + .../api/migrate.get_reindex_status.json | 31 ++++ .../ReindexDataStreamTransportActionIT.java | 32 +++- .../xpack/migrate/MigratePlugin.java | 5 + .../GetMigrationReindexStatusAction.java | 143 ++++++++++++++++++ ...MigrationReindexStatusTransportAction.java | 126 +++++++++++++++ .../action/ReindexDataStreamAction.java | 1 + .../ReindexDataStreamTransportAction.java | 4 +- .../RestGetMigrationReindexStatusAction.java | 39 +++++ ...rationReindexStatusActionRequestTests.java | 31 ++++ ...ationReindexStatusActionResponseTests.java | 128 ++++++++++++++++ .../xpack/security/operator/Constants.java | 3 +- .../test/migrate/20_reindex_status.yml | 56 +++++++ 13 files changed, 595 insertions(+), 9 deletions(-) create mode 100644 docs/changelog/118267.yaml create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/api/migrate.get_reindex_status.json create mode 100644 x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/GetMigrationReindexStatusAction.java create mode 100644 x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/GetMigrationReindexStatusTransportAction.java create mode 100644 x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/rest/RestGetMigrationReindexStatusAction.java create mode 100644 x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/GetMigrationReindexStatusActionRequestTests.java create mode 100644 x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/GetMigrationReindexStatusActionResponseTests.java create mode 100644 x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/migrate/20_reindex_status.yml diff --git a/docs/changelog/118267.yaml b/docs/changelog/118267.yaml new file mode 100644 index 0000000000000..3e3920caeb0f9 --- /dev/null +++ b/docs/changelog/118267.yaml @@ -0,0 +1,5 @@ +pr: 118267 +summary: Adding get migration reindex status +area: Data streams +type: enhancement +issues: [] diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/migrate.get_reindex_status.json b/rest-api-spec/src/main/resources/rest-api-spec/api/migrate.get_reindex_status.json new file mode 100644 index 0000000000000..057269598a7d8 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/migrate.get_reindex_status.json @@ -0,0 +1,31 @@ +{ + "migrate.get_reindex_status":{ + "documentation":{ + "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-stream-reindex.html", + "description":"This API returns the status of a migration reindex attempt for a data stream or index" + }, + "stability":"experimental", + "visibility":"private", + "headers":{ + "accept": [ "application/json"], + "content_type": ["application/json"] + }, + "url":{ + "paths":[ + { + "path":"/_migration/reindex/{index}/_status", + "methods":[ + "GET" + ], + "parts":{ + "index":{ + "type":"string", + "description":"The index or data stream name" + } + } + } + ] + } + } +} + diff --git a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportActionIT.java b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportActionIT.java index 62716e11f1720..515250bb58a94 100644 --- a/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportActionIT.java +++ b/x-pack/plugin/migrate/src/internalClusterTest/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportActionIT.java @@ -29,6 +29,7 @@ import org.elasticsearch.xpack.migrate.MigratePlugin; import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.ReindexDataStreamRequest; import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.ReindexDataStreamResponse; +import org.elasticsearch.xpack.migrate.task.ReindexDataStreamStatus; import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTask; import java.util.Collection; @@ -68,7 +69,7 @@ public void testAlreadyUpToDateDataStream() throws Exception { ReindexDataStreamAction.Mode.UPGRADE, dataStreamName ); - createDataStream(dataStreamName); + final int backingIndexCount = createDataStream(dataStreamName); ReindexDataStreamResponse response = client().execute( new ActionType(ReindexDataStreamAction.NAME), reindexDataStreamRequest @@ -78,7 +79,6 @@ public void testAlreadyUpToDateDataStream() throws Exception { AtomicReference runningTask = new AtomicReference<>(); for (TransportService transportService : internalCluster().getInstances(TransportService.class)) { TaskManager taskManager = transportService.getTaskManager(); - Map tasksMap = taskManager.getCancellableTasks(); Optional> optionalTask = taskManager.getCancellableTasks() .entrySet() .stream() @@ -99,9 +99,24 @@ public void testAlreadyUpToDateDataStream() throws Exception { assertThat(task.getStatus().pending(), equalTo(0)); assertThat(task.getStatus().inProgress(), equalTo(0)); assertThat(task.getStatus().errors().size(), equalTo(0)); + + assertBusy(() -> { + GetMigrationReindexStatusAction.Response statusResponse = client().execute( + new ActionType(GetMigrationReindexStatusAction.NAME), + new GetMigrationReindexStatusAction.Request(dataStreamName) + ).actionGet(); + ReindexDataStreamStatus status = (ReindexDataStreamStatus) statusResponse.getTask().getTask().status(); + assertThat(status.complete(), equalTo(true)); + assertThat(status.errors(), equalTo(List.of())); + assertThat(status.exception(), equalTo(null)); + assertThat(status.pending(), equalTo(0)); + assertThat(status.inProgress(), equalTo(0)); + assertThat(status.totalIndices(), equalTo(backingIndexCount)); + assertThat(status.totalIndicesToBeUpgraded(), equalTo(0)); + }); } - private void createDataStream(String dataStreamName) { + private int createDataStream(String dataStreamName) { final TransportPutComposableIndexTemplateAction.Request putComposableTemplateRequest = new TransportPutComposableIndexTemplateAction.Request("my-template"); putComposableTemplateRequest.indexTemplate( @@ -125,10 +140,13 @@ private void createDataStream(String dataStreamName) { client().execute(CreateDataStreamAction.INSTANCE, createDataStreamRequest) ); assertThat(createDataStreamResponse.isAcknowledged(), is(true)); - indexDocs(dataStreamName); - safeGet(new RolloverRequestBuilder(client()).setRolloverTarget(dataStreamName).lazy(false).execute()); - indexDocs(dataStreamName); - safeGet(new RolloverRequestBuilder(client()).setRolloverTarget(dataStreamName).lazy(false).execute()); + int backingIndices = 1; + for (int i = 0; i < randomIntBetween(2, 5); i++) { + indexDocs(dataStreamName); + safeGet(new RolloverRequestBuilder(client()).setRolloverTarget(dataStreamName).lazy(false).execute()); + backingIndices++; + } + return backingIndices; } private void indexDocs(String dataStreamName) { diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java index ac9e38da07421..1af66a2c61d56 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/MigratePlugin.java @@ -32,8 +32,11 @@ import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xcontent.NamedXContentRegistry; import org.elasticsearch.xcontent.ParseField; +import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusAction; +import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusTransportAction; import org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction; import org.elasticsearch.xpack.migrate.action.ReindexDataStreamTransportAction; +import org.elasticsearch.xpack.migrate.rest.RestGetMigrationReindexStatusAction; import org.elasticsearch.xpack.migrate.rest.RestMigrationReindexAction; import org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskExecutor; import org.elasticsearch.xpack.migrate.task.ReindexDataStreamPersistentTaskState; @@ -65,6 +68,7 @@ public List getRestHandlers( List handlers = new ArrayList<>(); if (REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()) { handlers.add(new RestMigrationReindexAction()); + handlers.add(new RestGetMigrationReindexStatusAction()); } return handlers; } @@ -74,6 +78,7 @@ public List getRestHandlers( List> actions = new ArrayList<>(); if (REINDEX_DATA_STREAM_FEATURE_FLAG.isEnabled()) { actions.add(new ActionHandler<>(ReindexDataStreamAction.INSTANCE, ReindexDataStreamTransportAction.class)); + actions.add(new ActionHandler<>(GetMigrationReindexStatusAction.INSTANCE, GetMigrationReindexStatusTransportAction.class)); } return actions; } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/GetMigrationReindexStatusAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/GetMigrationReindexStatusAction.java new file mode 100644 index 0000000000000..68ccaef4bf02c --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/GetMigrationReindexStatusAction.java @@ -0,0 +1,143 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.action; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskResult; +import org.elasticsearch.xcontent.ToXContentObject; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +public class GetMigrationReindexStatusAction extends ActionType { + + public static final GetMigrationReindexStatusAction INSTANCE = new GetMigrationReindexStatusAction(); + public static final String NAME = "indices:admin/migration/reindex_status"; + + public GetMigrationReindexStatusAction() { + super(NAME); + } + + public static class Response extends ActionResponse implements ToXContentObject { + private final TaskResult task; + + public Response(TaskResult task) { + this.task = requireNonNull(task, "task is required"); + } + + public Response(StreamInput in) throws IOException { + super(in); + task = in.readOptionalWriteable(TaskResult::new); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeOptionalWriteable(task); + } + + /** + * Get the actual result of the fetch. + */ + public TaskResult getTask() { + return task; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + Task.Status status = task.getTask().status(); + if (status != null) { + task.getTask().status().toXContent(builder, params); + } + return builder; + } + + @Override + public int hashCode() { + return Objects.hashCode(task); + } + + @Override + public boolean equals(Object other) { + return other instanceof Response && task.equals(((Response) other).task); + } + + @Override + public String toString() { + String toString = Strings.toString(this); + return toString.isEmpty() ? "unavailable" : toString; + } + + } + + public static class Request extends ActionRequest implements IndicesRequest { + private final String index; + + public Request(String index) { + super(); + this.index = index; + } + + public Request(StreamInput in) throws IOException { + super(in); + this.index = in.readString(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeString(index); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + public String getIndex() { + return index; + } + + @Override + public int hashCode() { + return Objects.hashCode(index); + } + + @Override + public boolean equals(Object other) { + return other instanceof Request && index.equals(((Request) other).index); + } + + public Request nodeRequest(String thisNodeId, long thisTaskId) { + Request copy = new Request(index); + copy.setParentTask(thisNodeId, thisTaskId); + return copy; + } + + @Override + public String[] indices() { + return new String[] { index }; + } + + @Override + public IndicesOptions indicesOptions() { + return IndicesOptions.strictSingleIndexNoExpandForbidClosed(); + } + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/GetMigrationReindexStatusTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/GetMigrationReindexStatusTransportAction.java new file mode 100644 index 0000000000000..f2a6e33f7cb05 --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/GetMigrationReindexStatusTransportAction.java @@ -0,0 +1,126 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.action; + +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.ResourceNotFoundException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionListenerResponseHandler; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.core.Strings; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.persistent.AllocatedPersistentTask; +import org.elasticsearch.persistent.PersistentTasksCustomMetadata; +import org.elasticsearch.tasks.CancellableTask; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.tasks.TaskResult; +import org.elasticsearch.transport.TransportRequestOptions; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusAction.Request; +import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusAction.Response; + +import java.util.Map; +import java.util.Optional; + +public class GetMigrationReindexStatusTransportAction extends HandledTransportAction { + private final ClusterService clusterService; + private final TransportService transportService; + + @Inject + public GetMigrationReindexStatusTransportAction( + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters + ) { + super(GetMigrationReindexStatusAction.NAME, transportService, actionFilters, Request::new, EsExecutors.DIRECT_EXECUTOR_SERVICE); + this.clusterService = clusterService; + this.transportService = transportService; + } + + @Override + protected void doExecute(Task task, Request request, ActionListener listener) { + String index = request.getIndex(); + String persistentTaskId = ReindexDataStreamAction.TASK_ID_PREFIX + index; + PersistentTasksCustomMetadata persistentTasksCustomMetadata = clusterService.state() + .getMetadata() + .custom(PersistentTasksCustomMetadata.TYPE); + PersistentTasksCustomMetadata.PersistentTask persistentTask = persistentTasksCustomMetadata.getTask(persistentTaskId); + if (persistentTask == null) { + listener.onFailure(new ResourceNotFoundException("No migration reindex status found for [{}]", index)); + } else if (persistentTask.isAssigned()) { + String nodeId = persistentTask.getExecutorNode(); + if (clusterService.localNode().getId().equals(nodeId)) { + getRunningTaskFromNode(persistentTaskId, listener); + } else { + runOnNodeWithTaskIfPossible(task, request, nodeId, listener); + } + } else { + listener.onFailure(new ElasticsearchException("Persistent task with id [{}] is not assigned to a node", persistentTaskId)); + } + } + + private Task getRunningPersistentTaskFromTaskManager(String persistentTaskId) { + Optional> optionalTask = taskManager.getCancellableTasks() + .entrySet() + .stream() + .filter(entry -> entry.getValue().getType().equals("persistent")) + .filter( + entry -> entry.getValue() instanceof AllocatedPersistentTask + && persistentTaskId.equals((((AllocatedPersistentTask) entry.getValue()).getPersistentTaskId())) + ) + .findAny(); + return optionalTask.map(Map.Entry::getValue).orElse(null); + } + + void getRunningTaskFromNode(String persistentTaskId, ActionListener listener) { + Task runningTask = getRunningPersistentTaskFromTaskManager(persistentTaskId); + if (runningTask == null) { + listener.onFailure( + new ResourceNotFoundException( + Strings.format( + "Persistent task [{}] is supposed to be running on node [{}], " + "but the task is not found on that node", + persistentTaskId, + clusterService.localNode().getId() + ) + ) + ); + } else { + TaskInfo info = runningTask.taskInfo(clusterService.localNode().getId(), true); + listener.onResponse(new Response(new TaskResult(false, info))); + } + } + + private void runOnNodeWithTaskIfPossible(Task thisTask, Request request, String nodeId, ActionListener listener) { + DiscoveryNode node = clusterService.state().nodes().get(nodeId); + if (node == null) { + listener.onFailure( + new ResourceNotFoundException( + Strings.format( + "Persistent task [{}] is supposed to be running on node [{}], but that node is not part of the cluster", + request.getIndex(), + nodeId + ) + ) + ); + } else { + Request nodeRequest = request.nodeRequest(clusterService.localNode().getId(), thisTask.getId()); + transportService.sendRequest( + node, + GetMigrationReindexStatusAction.NAME, + nodeRequest, + TransportRequestOptions.EMPTY, + new ActionListenerResponseHandler<>(listener, Response::new, EsExecutors.DIRECT_EXECUTOR_SERVICE) + ); + } + } +} diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java index eb7a910df8c0c..9e4cbb1082215 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamAction.java @@ -31,6 +31,7 @@ public class ReindexDataStreamAction extends ActionType { public static final FeatureFlag REINDEX_DATA_STREAM_FEATURE_FLAG = new FeatureFlag("reindex_data_stream"); + public static final String TASK_ID_PREFIX = "reindex-data-stream-"; public static final ReindexDataStreamAction INSTANCE = new ReindexDataStreamAction(); public static final String NAME = "indices:admin/data_stream/reindex"; diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java index 7f68007f821ba..95a078690a055 100644 --- a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/action/ReindexDataStreamTransportAction.java @@ -26,6 +26,8 @@ import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTask; import org.elasticsearch.xpack.migrate.task.ReindexDataStreamTaskParams; +import static org.elasticsearch.xpack.migrate.action.ReindexDataStreamAction.TASK_ID_PREFIX; + /* * This transport action creates a new persistent task for reindexing the source data stream given in the request. On successful creation * of the persistent task, it responds with the persistent task id so that the user can monitor the persistent task. @@ -87,6 +89,6 @@ protected void doExecute(Task task, ReindexDataStreamRequest request, ActionList } private String getPersistentTaskId(String dataStreamName) throws ResourceAlreadyExistsException { - return "reindex-data-stream-" + dataStreamName; + return TASK_ID_PREFIX + dataStreamName; } } diff --git a/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/rest/RestGetMigrationReindexStatusAction.java b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/rest/RestGetMigrationReindexStatusAction.java new file mode 100644 index 0000000000000..759104dd6f100 --- /dev/null +++ b/x-pack/plugin/migrate/src/main/java/org/elasticsearch/xpack/migrate/rest/RestGetMigrationReindexStatusAction.java @@ -0,0 +1,39 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.rest; + +import org.elasticsearch.client.internal.node.NodeClient; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusAction; + +import java.io.IOException; +import java.util.List; + +import static org.elasticsearch.rest.RestRequest.Method.GET; + +public class RestGetMigrationReindexStatusAction extends BaseRestHandler { + + @Override + public String getName() { + return "get_migration_reindex_status_action"; + } + + @Override + public List routes() { + return List.of(new Route(GET, "/_migration/reindex/{index}/_status")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + String index = request.param("index"); + GetMigrationReindexStatusAction.Request getTaskRequest = new GetMigrationReindexStatusAction.Request(index); + return channel -> client.execute(GetMigrationReindexStatusAction.INSTANCE, getTaskRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/GetMigrationReindexStatusActionRequestTests.java b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/GetMigrationReindexStatusActionRequestTests.java new file mode 100644 index 0000000000000..6943cf26f2b5e --- /dev/null +++ b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/GetMigrationReindexStatusActionRequestTests.java @@ -0,0 +1,31 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.action; + +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusAction.Request; + +import java.io.IOException; + +public class GetMigrationReindexStatusActionRequestTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return Request::new; + } + + @Override + protected Request createTestInstance() { + return new Request(randomAlphaOfLength(100)); + } + + @Override + protected Request mutateInstance(Request instance) throws IOException { + return createTestInstance(); // There's only one field + } +} diff --git a/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/GetMigrationReindexStatusActionResponseTests.java b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/GetMigrationReindexStatusActionResponseTests.java new file mode 100644 index 0000000000000..a18030edbf42c --- /dev/null +++ b/x-pack/plugin/migrate/src/test/java/org/elasticsearch/xpack/migrate/action/GetMigrationReindexStatusActionResponseTests.java @@ -0,0 +1,128 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.migrate.action; + +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.tasks.RawTaskStatus; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.tasks.TaskId; +import org.elasticsearch.tasks.TaskInfo; +import org.elasticsearch.tasks.TaskResult; +import org.elasticsearch.test.AbstractWireSerializingTestCase; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentType; +import org.elasticsearch.xpack.migrate.action.GetMigrationReindexStatusAction.Response; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; +import java.util.TreeMap; + +public class GetMigrationReindexStatusActionResponseTests extends AbstractWireSerializingTestCase { + @Override + protected Writeable.Reader instanceReader() { + return Response::new; + } + + @Override + protected Response createTestInstance() { + try { + return new Response(randomTaskResult()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + protected Response mutateInstance(Response instance) throws IOException { + return createTestInstance(); // There's only one field + } + + private static TaskResult randomTaskResult() throws IOException { + return switch (between(0, 2)) { + case 0 -> new TaskResult(randomBoolean(), randomTaskInfo()); + case 1 -> new TaskResult(randomTaskInfo(), new RuntimeException("error")); + case 2 -> new TaskResult(randomTaskInfo(), randomTaskResponse()); + default -> throw new UnsupportedOperationException("Unsupported random TaskResult constructor"); + }; + } + + static TaskInfo randomTaskInfo() { + String nodeId = randomAlphaOfLength(5); + TaskId taskId = randomTaskId(nodeId); + String type = randomAlphaOfLength(5); + String action = randomAlphaOfLength(5); + Task.Status status = randomBoolean() ? randomRawTaskStatus() : null; + String description = randomBoolean() ? randomAlphaOfLength(5) : null; + long startTime = randomLong(); + long runningTimeNanos = randomNonNegativeLong(); + boolean cancellable = randomBoolean(); + boolean cancelled = cancellable && randomBoolean(); + TaskId parentTaskId = randomBoolean() ? TaskId.EMPTY_TASK_ID : randomTaskId(randomAlphaOfLength(5)); + Map headers = randomBoolean() + ? Collections.emptyMap() + : Collections.singletonMap(randomAlphaOfLength(5), randomAlphaOfLength(5)); + return new TaskInfo( + taskId, + type, + nodeId, + action, + description, + status, + startTime, + runningTimeNanos, + cancellable, + cancelled, + parentTaskId, + headers + ); + } + + private static TaskId randomTaskId(String nodeId) { + return new TaskId(nodeId, randomLong()); + } + + private static RawTaskStatus randomRawTaskStatus() { + try (XContentBuilder builder = XContentBuilder.builder(XContentType.JSON.xContent())) { + builder.startObject(); + int fields = between(0, 10); + for (int f = 0; f < fields; f++) { + builder.field(randomAlphaOfLength(5), randomAlphaOfLength(5)); + } + builder.endObject(); + return new RawTaskStatus(BytesReference.bytes(builder)); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + + private static ToXContent randomTaskResponse() { + Map result = new TreeMap<>(); + int fields = between(0, 10); + for (int f = 0; f < fields; f++) { + result.put(randomAlphaOfLength(5), randomAlphaOfLength(5)); + } + return (builder, params) -> { + for (Map.Entry entry : result.entrySet()) { + builder.field(entry.getKey(), entry.getValue()); + } + return builder; + }; + } + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return new NamedWriteableRegistry(NetworkModule.getNamedWriteables()); + // return new NamedWriteableRegistry(List.of(new NamedWriteableRegistry.Entry(Task.Status.class, RawTaskStatus.NAME, + // RawTaskStatus::new))); + } +} diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index 8df10037affdb..fe919557968c7 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -493,7 +493,6 @@ public class Constants { "indices:admin/block/add[s]", "indices:admin/cache/clear", "indices:admin/data_stream/lazy_rollover", - "indices:admin/data_stream/reindex", "indices:internal/admin/ccr/restore/file_chunk/get", "indices:internal/admin/ccr/restore/session/clear", "indices:internal/admin/ccr/restore/session/put", @@ -636,6 +635,8 @@ public class Constants { "internal:gateway/local/started_shards", "internal:admin/indices/prevalidate_shard_path", "internal:index/metadata/migration_version/update", + "indices:admin/migration/reindex_status", + "indices:admin/data_stream/reindex", "internal:admin/repository/verify", "internal:admin/repository/verify/coordinate" ).filter(Objects::nonNull).collect(Collectors.toUnmodifiableSet()); diff --git a/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/migrate/20_reindex_status.yml b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/migrate/20_reindex_status.yml new file mode 100644 index 0000000000000..3fe133aeda70e --- /dev/null +++ b/x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/migrate/20_reindex_status.yml @@ -0,0 +1,56 @@ +--- +setup: + - do: + cluster.health: + wait_for_status: yellow + +--- +"Test get reindex status with nonexistent task id": + - do: + catch: /resource_not_found_exception/ + migrate.get_reindex_status: + index: "does_not_exist" + +--- +"Test Reindex With Existing Data Stream": + - do: + indices.put_index_template: + name: my-template1 + body: + index_patterns: [my-data-stream*] + template: + mappings: + properties: + '@timestamp': + type: date + 'foo': + type: keyword + data_stream: {} + + - do: # superuser + indices.create_data_stream: + name: my-data-stream + - is_true: acknowledged + +# Uncomment once the cancel API is in place +# - do: +# migrate.reindex: +# body: | +# { +# "mode": "upgrade", +# "source": { +# "index": "my-data-stream" +# } +# } +# - match: { acknowledged: true } +# +# - do: +# migrate.get_reindex_status: +# index: "my-data-stream" +# - match: { complete: true } +# - match: { total_indices: 1 } +# - match: { total_indices_requiring_upgrade: 0 } +# - match: { successes: 0 } +# - match: { in_progress: 0 } +# - match: { pending: 0 } +# - match: { errors: [] }