From 7955c5d0291a709880a16738d9db38c618822311 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 12 Sep 2022 12:21:18 +0200 Subject: [PATCH] Turn RecoveryResponse into a chunked REST response These can be huge, make them chunked to be nice to the coordinating node. relates #89838 --- .../indices/recovery/RecoveryResponse.java | 49 ++++++------- .../admin/indices/RestRecoveryAction.java | 16 ++++- .../recovery/RecoveryResponseTests.java | 68 +++++++++++++++++++ 3 files changed, 108 insertions(+), 25 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/action/admin/indices/recovery/RecoveryResponseTests.java diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryResponse.java index 1a0a2f1b61e57..dc145acb27db3 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/recovery/RecoveryResponse.java @@ -11,19 +11,22 @@ import org.elasticsearch.action.support.DefaultShardOperationFailedException; import org.elasticsearch.action.support.broadcast.BroadcastResponse; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.indices.recovery.RecoveryState; -import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.ToXContent; import java.io.IOException; +import java.util.Iterator; import java.util.List; import java.util.Map; /** * Information regarding the recovery state of indices and their associated shards. */ -public class RecoveryResponse extends BroadcastResponse { +public class RecoveryResponse extends BroadcastResponse implements ChunkedToXContent { private final Map> shardRecoveryStates; @@ -62,27 +65,27 @@ public Map> shardRecoveryStates() { } @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - if (hasRecoveries()) { - for (String index : shardRecoveryStates.keySet()) { - List recoveryStates = shardRecoveryStates.get(index); - if (recoveryStates == null || recoveryStates.size() == 0) { - continue; - } - builder.startObject(index); - builder.startArray("shards"); - for (RecoveryState recoveryState : recoveryStates) { - builder.startObject(); - recoveryState.toXContent(builder, params); - builder.endObject(); - } - builder.endArray(); - builder.endObject(); - } - } - builder.endObject(); - return builder; + public Iterator toXContentChunked() { + return Iterators.concat( + Iterators.single((b, p) -> b.startObject()), + shardRecoveryStates.entrySet() + .stream() + .filter(entry -> entry != null && entry.getValue().isEmpty() == false) + .map(entry -> (ToXContent) (b, p) -> { + b.startObject(entry.getKey()); + b.startArray("shards"); + for (RecoveryState recoveryState : entry.getValue()) { + b.startObject(); + recoveryState.toXContent(b, p); + b.endObject(); + } + b.endArray(); + b.endObject(); + return b; + }) + .iterator(), + Iterators.single((b, p) -> b.endObject()) + ); } @Override diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRecoveryAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRecoveryAction.java index 10a23c754188c..d8a9c9227fa50 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRecoveryAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestRecoveryAction.java @@ -9,13 +9,17 @@ package org.elasticsearch.rest.action.admin.indices; import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest; +import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse; import org.elasticsearch.action.support.IndicesOptions; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.common.Strings; import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.ChunkedRestResponseBody; import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestResponse; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestActionListener; import org.elasticsearch.rest.action.RestCancellableNodeClient; -import org.elasticsearch.rest.action.RestToXContentListener; import java.io.IOException; import java.util.List; @@ -51,6 +55,14 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC recoveryRequest.indicesOptions(IndicesOptions.fromRequest(request, recoveryRequest.indicesOptions())); return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin() .indices() - .recoveries(recoveryRequest, new RestToXContentListener<>(channel)); + .recoveries(recoveryRequest, new RestActionListener<>(channel) { + @Override + protected void processResponse(RecoveryResponse recoveryResponse) throws IOException { + ensureOpen(); + channel.sendResponse( + new RestResponse(RestStatus.OK, ChunkedRestResponseBody.fromXContent(recoveryResponse, request, channel)) + ); + } + }); } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/recovery/RecoveryResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/recovery/RecoveryResponseTests.java new file mode 100644 index 0000000000000..6cf36a8c12d2e --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/recovery/RecoveryResponseTests.java @@ -0,0 +1,68 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.admin.indices.recovery; + +import org.elasticsearch.Version; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RecoverySource; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.routing.UnassignedInfo; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.indices.recovery.RecoveryState; +import org.elasticsearch.test.ESTestCase; + +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; + +public class RecoveryResponseTests extends ESTestCase { + + public void testChunkedToXContent() { + final int failedShards = randomIntBetween(0, 50); + final int successfulShards = randomIntBetween(0, 50); + DiscoveryNode sourceNode = new DiscoveryNode("foo", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + DiscoveryNode targetNode = new DiscoveryNode("bar", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT); + final int shards = randomInt(50); + final RecoveryResponse recoveryResponse = new RecoveryResponse( + successfulShards + failedShards, + successfulShards, + failedShards, + IntStream.range(0, shards) + .boxed() + .collect( + Collectors.toUnmodifiableMap( + i -> "index-" + i, + i -> List.of( + new RecoveryState( + ShardRouting.newUnassigned( + new ShardId("index-" + i, "index-uuid-" + i, 0), + randomBoolean(), + RecoverySource.PeerRecoverySource.INSTANCE, + new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null) + ).initialize(sourceNode.getId(), null, randomNonNegativeLong()), + sourceNode, + targetNode + ) + ) + ) + ), + List.of() + ); + final var iterator = recoveryResponse.toXContentChunked(); + int chunks = 0; + while (iterator.hasNext()) { + iterator.next(); + chunks++; + } + assertEquals(shards + 2, chunks); + } +}