From 908acbebb0b9f49e929f3ecf5ab2969ec2f880ca Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Mon, 21 Nov 2022 14:36:14 +0100 Subject: [PATCH 1/4] Use chunked encoding for indices stats response These responses can become huge, lets chunk them by index. relates #89838 --- .../indices/stats/IndicesStatsResponse.java | 142 +++++++++++------- .../admin/indices/RestIndicesStatsAction.java | 4 +- .../stats/IndicesStatsResponseTests.java | 48 +++++- 3 files changed, 137 insertions(+), 57 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java index 25c804a340a72..a68396eb96658 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java @@ -8,23 +8,28 @@ package org.elasticsearch.action.admin.indices.stats; +import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.stats.IndexStats.IndexStatsBuilder; import org.elasticsearch.action.support.DefaultShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.BroadcastResponse; +import org.elasticsearch.action.support.broadcast.BaseBroadcastResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterIndexHealth; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.index.Index; -import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.rest.action.RestActions; +import org.elasticsearch.xcontent.ToXContent; import java.io.IOException; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; @@ -33,7 +38,7 @@ import static java.util.Collections.unmodifiableMap; -public class IndicesStatsResponse extends BroadcastResponse { +public class IndicesStatsResponse extends BaseBroadcastResponse implements ChunkedToXContent { private final Map indexHealthMap; @@ -171,63 +176,96 @@ public void writeTo(StreamOutput out) throws IOException { } @Override - protected void addCustomXContentFields(XContentBuilder builder, Params params) throws IOException { - final String level = params.param("level", "indices"); - final boolean isLevelValid = "cluster".equalsIgnoreCase(level) - || "indices".equalsIgnoreCase(level) - || "shards".equalsIgnoreCase(level); - if (isLevelValid == false) { - throw new IllegalArgumentException("level parameter must be one of [cluster] or [indices] or [shards] but was [" + level + "]"); - } + public Iterator toXContentChunked() { + final SetOnce levelReference = new SetOnce<>(); + return Iterators.concat(Iterators.single(((builder, params) -> { + final String level = params.param("level", "indices"); + final boolean isLevelValid = "cluster".equalsIgnoreCase(level) + || "indices".equalsIgnoreCase(level) + || "shards".equalsIgnoreCase(level); + if (isLevelValid == false) { + throw new IllegalArgumentException( + "level parameter must be one of [cluster] or [indices] or [shards] but was [" + level + "]" + ); + } + levelReference.set(level); - builder.startObject("_all"); + builder.startObject(); + RestActions.buildBroadcastShardsHeader(builder, params, this); + builder.startObject("_all"); - builder.startObject("primaries"); - getPrimaries().toXContent(builder, params); - builder.endObject(); + builder.startObject("primaries"); + getPrimaries().toXContent(builder, params); + builder.endObject(); - builder.startObject("total"); - getTotal().toXContent(builder, params); - builder.endObject(); + builder.startObject("total"); + getTotal().toXContent(builder, params); + builder.endObject(); - builder.endObject(); + builder.endObject(); + return builder; + })), new Iterator<>() { - if ("indices".equalsIgnoreCase(level) || "shards".equalsIgnoreCase(level)) { - builder.startObject(Fields.INDICES); - for (IndexStats indexStats : getIndices().values()) { - builder.startObject(indexStats.getIndex()); - builder.field("uuid", indexStats.getUuid()); - if (indexStats.getHealth() != null) { - builder.field("health", indexStats.getHealth().toString().toLowerCase(Locale.ROOT)); - } - if (indexStats.getState() != null) { - builder.field("status", indexStats.getState().toString().toLowerCase(Locale.ROOT)); - } - builder.startObject("primaries"); - indexStats.getPrimaries().toXContent(builder, params); - builder.endObject(); - - builder.startObject("total"); - indexStats.getTotal().toXContent(builder, params); - builder.endObject(); - - if ("shards".equalsIgnoreCase(level)) { - builder.startObject(Fields.SHARDS); - for (IndexShardStats indexShardStats : indexStats) { - builder.startArray(Integer.toString(indexShardStats.getShardId().id())); - for (ShardStats shardStats : indexShardStats) { - builder.startObject(); - shardStats.toXContent(builder, params); - builder.endObject(); - } - builder.endArray(); + private Iterator delegate; + + @Override + public boolean hasNext() { + maybeInitDelegate(); + return delegate.hasNext(); + } + + @Override + public ToXContent next() { + maybeInitDelegate(); + return delegate.next(); + } + + private void maybeInitDelegate() { + if (delegate == null) { + final String level = levelReference.get(); + if ("indices".equalsIgnoreCase(level) || "shards".equalsIgnoreCase(level)) { + delegate = Iterators.concat( + Iterators.single((b, p) -> b.startObject(Fields.INDICES)), + getIndices().values().stream().map(indexStats -> (builder, params) -> { + builder.startObject(indexStats.getIndex()); + builder.field("uuid", indexStats.getUuid()); + if (indexStats.getHealth() != null) { + builder.field("health", indexStats.getHealth().toString().toLowerCase(Locale.ROOT)); + } + if (indexStats.getState() != null) { + builder.field("status", indexStats.getState().toString().toLowerCase(Locale.ROOT)); + } + builder.startObject("primaries"); + indexStats.getPrimaries().toXContent(builder, params); + builder.endObject(); + + builder.startObject("total"); + indexStats.getTotal().toXContent(builder, params); + builder.endObject(); + + if ("shards".equalsIgnoreCase(level)) { + builder.startObject(Fields.SHARDS); + for (IndexShardStats indexShardStats : indexStats) { + builder.startArray(Integer.toString(indexShardStats.getShardId().id())); + for (ShardStats shardStats : indexShardStats) { + builder.startObject(); + shardStats.toXContent(builder, params); + builder.endObject(); + } + builder.endArray(); + } + builder.endObject(); + } + return builder.endObject(); + }).iterator(), + Iterators.single((b, p) -> b.endObject().endObject()) + ); + } else { + delegate = Iterators.single((b, p) -> b.endObject()); } - builder.endObject(); } - builder.endObject(); } - builder.endObject(); - } + }); } static final class Fields { diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestIndicesStatsAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestIndicesStatsAction.java index 737c1ccc4cba6..74924e4bfaf7f 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestIndicesStatsAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/indices/RestIndicesStatsAction.java @@ -19,7 +19,7 @@ import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; import org.elasticsearch.rest.action.RestCancellableNodeClient; -import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.rest.action.RestChunkedToXContentListener; import org.elasticsearch.rest.action.document.RestMultiTermVectorsAction; import java.io.IOException; @@ -140,7 +140,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin() .indices() - .stats(indicesStatsRequest, new RestToXContentListener<>(channel)); + .stats(indicesStatsRequest, new RestChunkedToXContentListener<>(channel)); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponseTests.java index e1873b7714bcb..d39503b44641e 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponseTests.java @@ -13,13 +13,17 @@ import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardPath; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.xcontent.XContentType; import org.elasticsearch.xcontent.json.JsonXContent; +import java.io.IOException; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; @@ -41,7 +45,7 @@ public void testInvalidLevel() { final ToXContent.Params params = new ToXContent.MapParams(Collections.singletonMap("level", level)); final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> response.toXContent(JsonXContent.contentBuilder(), params) + () -> response.toXContentChunked().next().toXContent(JsonXContent.contentBuilder(), params) ); assertThat( e, @@ -64,7 +68,7 @@ public void testGetIndices() { ShardId shId = new ShardId(index, shardId); Path path = createTempDir().resolve("indices").resolve(index.getUUID()).resolve(String.valueOf(shardId)); ShardPath shardPath = new ShardPath(false, path, path, shId); - ShardRouting routing = createShardRouting(index, shId, (shardId == 0)); + ShardRouting routing = createShardRouting(shId, (shardId == 0)); shards.add(new ShardStats(routing, shardPath, null, null, null, null)); AtomicLong primaryShardsCounter = expectedIndexToPrimaryShardsCount.computeIfAbsent( index.getName(), @@ -105,7 +109,45 @@ public void testGetIndices() { } } - private ShardRouting createShardRouting(Index index, ShardId shardId, boolean isPrimary) { + public void testChunkedEncodingPerIndex() throws IOException { + final int shards = randomIntBetween(1, 10); + final List stats = new ArrayList<>(shards); + for (int i = 0; i < shards; i++) { + ShardId shId = new ShardId(createIndex("index-" + i), randomIntBetween(0, 1)); + Path path = createTempDir().resolve("indices").resolve(shId.getIndex().getUUID()).resolve(String.valueOf(shId.id())); + ShardPath shardPath = new ShardPath(false, path, path, shId); + ShardRouting routing = createShardRouting(shId, (shId.id() == 0)); + stats.add(new ShardStats(routing, shardPath, new CommonStats(), null, null, null)); + } + final IndicesStatsResponse indicesStatsResponse = new IndicesStatsResponse( + stats.toArray(new ShardStats[0]), + shards, + shards, + 0, + null, + ClusterState.EMPTY_STATE + ); + final var iteratorClusterLevel = indicesStatsResponse.toXContentChunked(); + final ToXContent.Params paramsClusterLevel = new ToXContent.MapParams(Map.of("level", "cluster")); + int chunksSeenClusterLevel = 0; + final XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), Streams.NULL_OUTPUT_STREAM); + while (iteratorClusterLevel.hasNext()) { + iteratorClusterLevel.next().toXContent(builder, paramsClusterLevel); + chunksSeenClusterLevel++; + } + assertEquals(2, chunksSeenClusterLevel); + + final var iteratorIndexLevel = indicesStatsResponse.toXContentChunked(); + final ToXContent.Params paramsIndexLevel = new ToXContent.MapParams(Map.of("level", "indices")); + int chunksSeenIndexLevel = 0; + while (iteratorIndexLevel.hasNext()) { + iteratorIndexLevel.next().toXContent(builder, paramsIndexLevel); + chunksSeenIndexLevel++; + } + assertEquals(3 + shards, chunksSeenIndexLevel); + } + + private ShardRouting createShardRouting(ShardId shardId, boolean isPrimary) { return TestShardRouting.newShardRouting(shardId, randomAlphaOfLength(4), isPrimary, ShardRoutingState.STARTED); } From ddaf09f41cad64bc45649287a1ab1a2188a8be18 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 24 Nov 2022 12:28:32 +0100 Subject: [PATCH 2/4] nicer thanks to changed API --- .../indices/stats/IndicesStatsResponse.java | 149 ++++++++---------- .../stats/IndicesStatsResponseTests.java | 10 +- 2 files changed, 67 insertions(+), 92 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java index a68396eb96658..b01db2a925839 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java @@ -8,7 +8,6 @@ package org.elasticsearch.action.admin.indices.stats; -import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.stats.IndexStats.IndexStatsBuilder; import org.elasticsearch.action.support.DefaultShardOperationFailedException; @@ -26,6 +25,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.rest.action.RestActions; import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; import java.util.HashMap; @@ -176,96 +176,71 @@ public void writeTo(StreamOutput out) throws IOException { } @Override - public Iterator toXContentChunked() { - final SetOnce levelReference = new SetOnce<>(); - return Iterators.concat(Iterators.single(((builder, params) -> { - final String level = params.param("level", "indices"); - final boolean isLevelValid = "cluster".equalsIgnoreCase(level) - || "indices".equalsIgnoreCase(level) - || "shards".equalsIgnoreCase(level); - if (isLevelValid == false) { - throw new IllegalArgumentException( - "level parameter must be one of [cluster] or [indices] or [shards] but was [" + level + "]" - ); - } - levelReference.set(level); - - builder.startObject(); - RestActions.buildBroadcastShardsHeader(builder, params, this); - builder.startObject("_all"); - - builder.startObject("primaries"); - getPrimaries().toXContent(builder, params); - builder.endObject(); - - builder.startObject("total"); - getTotal().toXContent(builder, params); - builder.endObject(); - - builder.endObject(); - return builder; - })), new Iterator<>() { + public Iterator toXContentChunked(ToXContent.Params params) { + final String level = params.param("level", "indices"); + final boolean isLevelValid = "cluster".equalsIgnoreCase(level) + || "indices".equalsIgnoreCase(level) + || "shards".equalsIgnoreCase(level); + if (isLevelValid == false) { + throw new IllegalArgumentException("level parameter must be one of [cluster] or [indices] or [shards] but was [" + level + "]"); + } + if ("indices".equalsIgnoreCase(level) || "shards".equalsIgnoreCase(level)) { + return Iterators.concat(Iterators.single(((builder, p) -> { + headerAndCommonStats(builder, p); + return builder.startObject(Fields.INDICES); + })), getIndices().values().stream().map(indexStats -> (builder, p) -> { + builder.startObject(indexStats.getIndex()); + builder.field("uuid", indexStats.getUuid()); + if (indexStats.getHealth() != null) { + builder.field("health", indexStats.getHealth().toString().toLowerCase(Locale.ROOT)); + } + if (indexStats.getState() != null) { + builder.field("status", indexStats.getState().toString().toLowerCase(Locale.ROOT)); + } + builder.startObject("primaries"); + indexStats.getPrimaries().toXContent(builder, p); + builder.endObject(); + + builder.startObject("total"); + indexStats.getTotal().toXContent(builder, p); + builder.endObject(); + + if ("shards".equalsIgnoreCase(level)) { + builder.startObject(Fields.SHARDS); + for (IndexShardStats indexShardStats : indexStats) { + builder.startArray(Integer.toString(indexShardStats.getShardId().id())); + for (ShardStats shardStats : indexShardStats) { + builder.startObject(); + shardStats.toXContent(builder, p); + builder.endObject(); + } + builder.endArray(); + } + builder.endObject(); + } + return builder.endObject(); + }).iterator(), Iterators.single((b, p) -> b.endObject().endObject())); + } + return Iterators.single((b, p) -> { + headerAndCommonStats(b, p); + return b.endObject(); + }); + } - private Iterator delegate; + private void headerAndCommonStats(XContentBuilder builder, ToXContent.Params p) throws IOException { + builder.startObject(); + RestActions.buildBroadcastShardsHeader(builder, p, this); + builder.startObject("_all"); - @Override - public boolean hasNext() { - maybeInitDelegate(); - return delegate.hasNext(); - } + builder.startObject("primaries"); + getPrimaries().toXContent(builder, p); + builder.endObject(); - @Override - public ToXContent next() { - maybeInitDelegate(); - return delegate.next(); - } + builder.startObject("total"); + getTotal().toXContent(builder, p); + builder.endObject(); - private void maybeInitDelegate() { - if (delegate == null) { - final String level = levelReference.get(); - if ("indices".equalsIgnoreCase(level) || "shards".equalsIgnoreCase(level)) { - delegate = Iterators.concat( - Iterators.single((b, p) -> b.startObject(Fields.INDICES)), - getIndices().values().stream().map(indexStats -> (builder, params) -> { - builder.startObject(indexStats.getIndex()); - builder.field("uuid", indexStats.getUuid()); - if (indexStats.getHealth() != null) { - builder.field("health", indexStats.getHealth().toString().toLowerCase(Locale.ROOT)); - } - if (indexStats.getState() != null) { - builder.field("status", indexStats.getState().toString().toLowerCase(Locale.ROOT)); - } - builder.startObject("primaries"); - indexStats.getPrimaries().toXContent(builder, params); - builder.endObject(); - - builder.startObject("total"); - indexStats.getTotal().toXContent(builder, params); - builder.endObject(); - - if ("shards".equalsIgnoreCase(level)) { - builder.startObject(Fields.SHARDS); - for (IndexShardStats indexShardStats : indexStats) { - builder.startArray(Integer.toString(indexShardStats.getShardId().id())); - for (ShardStats shardStats : indexShardStats) { - builder.startObject(); - shardStats.toXContent(builder, params); - builder.endObject(); - } - builder.endArray(); - } - builder.endObject(); - } - return builder.endObject(); - }).iterator(), - Iterators.single((b, p) -> b.endObject().endObject()) - ); - } else { - delegate = Iterators.single((b, p) -> b.endObject()); - } - } - } - }); + builder.endObject(); } static final class Fields { diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponseTests.java index d39503b44641e..b91ff9909e169 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponseTests.java @@ -45,7 +45,7 @@ public void testInvalidLevel() { final ToXContent.Params params = new ToXContent.MapParams(Collections.singletonMap("level", level)); final IllegalArgumentException e = expectThrows( IllegalArgumentException.class, - () -> response.toXContentChunked().next().toXContent(JsonXContent.contentBuilder(), params) + () -> response.toXContentChunked(params).next().toXContent(JsonXContent.contentBuilder(), params) ); assertThat( e, @@ -127,24 +127,24 @@ public void testChunkedEncodingPerIndex() throws IOException { null, ClusterState.EMPTY_STATE ); - final var iteratorClusterLevel = indicesStatsResponse.toXContentChunked(); final ToXContent.Params paramsClusterLevel = new ToXContent.MapParams(Map.of("level", "cluster")); + final var iteratorClusterLevel = indicesStatsResponse.toXContentChunked(paramsClusterLevel); int chunksSeenClusterLevel = 0; final XContentBuilder builder = new XContentBuilder(XContentType.JSON.xContent(), Streams.NULL_OUTPUT_STREAM); while (iteratorClusterLevel.hasNext()) { iteratorClusterLevel.next().toXContent(builder, paramsClusterLevel); chunksSeenClusterLevel++; } - assertEquals(2, chunksSeenClusterLevel); + assertEquals(1, chunksSeenClusterLevel); - final var iteratorIndexLevel = indicesStatsResponse.toXContentChunked(); final ToXContent.Params paramsIndexLevel = new ToXContent.MapParams(Map.of("level", "indices")); + final var iteratorIndexLevel = indicesStatsResponse.toXContentChunked(paramsIndexLevel); int chunksSeenIndexLevel = 0; while (iteratorIndexLevel.hasNext()) { iteratorIndexLevel.next().toXContent(builder, paramsIndexLevel); chunksSeenIndexLevel++; } - assertEquals(3 + shards, chunksSeenIndexLevel); + assertEquals(2 + shards, chunksSeenIndexLevel); } private ShardRouting createShardRouting(ShardId shardId, boolean isPrimary) { From a775d47499aa71c318cca191b36111b610331c86 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 24 Nov 2022 15:14:52 +0100 Subject: [PATCH 3/4] chunked base class --- .../segments/IndicesSegmentResponse.java | 126 +++++++++--------- .../indices/stats/IndicesStatsResponse.java | 20 ++- .../broadcast/ChunkedBroadCastResponse.java | 45 +++++++ .../segments/IndicesSegmentResponseTests.java | 2 +- .../stats/IndicesStatsResponseTests.java | 4 +- 5 files changed, 118 insertions(+), 79 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/support/broadcast/ChunkedBroadCastResponse.java diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java index c0a7db8460433..9ae232a3e7848 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java @@ -13,15 +13,13 @@ import org.apache.lucene.search.SortedNumericSortField; import org.apache.lucene.search.SortedSetSortField; import org.elasticsearch.action.support.DefaultShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.BaseBroadcastResponse; +import org.elasticsearch.action.support.broadcast.ChunkedBroadCastResponse; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.unit.ByteSizeValue; -import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.index.engine.Segment; -import org.elasticsearch.rest.action.RestActions; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; @@ -33,7 +31,7 @@ import java.util.Locale; import java.util.Map; -public class IndicesSegmentResponse extends BaseBroadcastResponse implements ChunkedToXContent { +public class IndicesSegmentResponse extends ChunkedBroadCastResponse { private final ShardSegments[] shards; @@ -79,72 +77,72 @@ public void writeTo(StreamOutput out) throws IOException { } @Override - public Iterator toXContentChunked(ToXContent.Params outerParams) { - return Iterators.concat(Iterators.single(((builder, params) -> { - builder.startObject(); - RestActions.buildBroadcastShardsHeader(builder, params, this); - return builder.startObject(Fields.INDICES); - })), getIndices().values().stream().map(indexSegments -> (ToXContent) (builder, params) -> { - builder.startObject(indexSegments.getIndex()); - - builder.startObject(Fields.SHARDS); - for (IndexShardSegments indexSegment : indexSegments) { - builder.startArray(Integer.toString(indexSegment.shardId().id())); - for (ShardSegments shardSegments : indexSegment) { - builder.startObject(); - - builder.startObject(Fields.ROUTING); - builder.field(Fields.STATE, shardSegments.getShardRouting().state()); - builder.field(Fields.PRIMARY, shardSegments.getShardRouting().primary()); - builder.field(Fields.NODE, shardSegments.getShardRouting().currentNodeId()); - if (shardSegments.getShardRouting().relocatingNodeId() != null) { - builder.field(Fields.RELOCATING_NODE, shardSegments.getShardRouting().relocatingNodeId()); - } - builder.endObject(); - - builder.field(Fields.NUM_COMMITTED_SEGMENTS, shardSegments.getNumberOfCommitted()); - builder.field(Fields.NUM_SEARCH_SEGMENTS, shardSegments.getNumberOfSearch()); - - builder.startObject(Fields.SEGMENTS); - for (Segment segment : shardSegments) { - builder.startObject(segment.getName()); - builder.field(Fields.GENERATION, segment.getGeneration()); - builder.field(Fields.NUM_DOCS, segment.getNumDocs()); - builder.field(Fields.DELETED_DOCS, segment.getDeletedDocs()); - builder.humanReadableField(Fields.SIZE_IN_BYTES, Fields.SIZE, segment.getSize()); - if (builder.getRestApiVersion() == RestApiVersion.V_7) { - builder.humanReadableField(Fields.MEMORY_IN_BYTES, Fields.MEMORY, ByteSizeValue.ZERO); - } - builder.field(Fields.COMMITTED, segment.isCommitted()); - builder.field(Fields.SEARCH, segment.isSearch()); - if (segment.getVersion() != null) { - builder.field(Fields.VERSION, segment.getVersion()); - } - if (segment.isCompound() != null) { - builder.field(Fields.COMPOUND, segment.isCompound()); + protected Iterator customXContentChunks(ToXContent.Params params) { + return Iterators.concat( + Iterators.single((builder, p) -> builder.startObject(Fields.INDICES)), + getIndices().values().stream().map(indexSegments -> (ToXContent) (builder, p) -> { + builder.startObject(indexSegments.getIndex()); + + builder.startObject(Fields.SHARDS); + for (IndexShardSegments indexSegment : indexSegments) { + builder.startArray(Integer.toString(indexSegment.shardId().id())); + for (ShardSegments shardSegments : indexSegment) { + builder.startObject(); + + builder.startObject(Fields.ROUTING); + builder.field(Fields.STATE, shardSegments.getShardRouting().state()); + builder.field(Fields.PRIMARY, shardSegments.getShardRouting().primary()); + builder.field(Fields.NODE, shardSegments.getShardRouting().currentNodeId()); + if (shardSegments.getShardRouting().relocatingNodeId() != null) { + builder.field(Fields.RELOCATING_NODE, shardSegments.getShardRouting().relocatingNodeId()); } - if (segment.getMergeId() != null) { - builder.field(Fields.MERGE_ID, segment.getMergeId()); - } - if (segment.getSegmentSort() != null) { - toXContent(builder, segment.getSegmentSort()); - } - if (segment.attributes != null && segment.attributes.isEmpty() == false) { - builder.field("attributes", segment.attributes); + builder.endObject(); + + builder.field(Fields.NUM_COMMITTED_SEGMENTS, shardSegments.getNumberOfCommitted()); + builder.field(Fields.NUM_SEARCH_SEGMENTS, shardSegments.getNumberOfSearch()); + + builder.startObject(Fields.SEGMENTS); + for (Segment segment : shardSegments) { + builder.startObject(segment.getName()); + builder.field(Fields.GENERATION, segment.getGeneration()); + builder.field(Fields.NUM_DOCS, segment.getNumDocs()); + builder.field(Fields.DELETED_DOCS, segment.getDeletedDocs()); + builder.humanReadableField(Fields.SIZE_IN_BYTES, Fields.SIZE, segment.getSize()); + if (builder.getRestApiVersion() == RestApiVersion.V_7) { + builder.humanReadableField(Fields.MEMORY_IN_BYTES, Fields.MEMORY, ByteSizeValue.ZERO); + } + builder.field(Fields.COMMITTED, segment.isCommitted()); + builder.field(Fields.SEARCH, segment.isSearch()); + if (segment.getVersion() != null) { + builder.field(Fields.VERSION, segment.getVersion()); + } + if (segment.isCompound() != null) { + builder.field(Fields.COMPOUND, segment.isCompound()); + } + if (segment.getMergeId() != null) { + builder.field(Fields.MERGE_ID, segment.getMergeId()); + } + if (segment.getSegmentSort() != null) { + toXContent(builder, segment.getSegmentSort()); + } + if (segment.attributes != null && segment.attributes.isEmpty() == false) { + builder.field("attributes", segment.attributes); + } + builder.endObject(); } builder.endObject(); - } - builder.endObject(); - builder.endObject(); + builder.endObject(); + } + builder.endArray(); } - builder.endArray(); - } - builder.endObject(); + builder.endObject(); - builder.endObject(); - return builder; - }).iterator(), Iterators.single((builder, params) -> builder.endObject().endObject())); + builder.endObject(); + return builder; + }).iterator(), + Iterators.single((builder, p) -> builder.endObject()) + ); } private static void toXContent(XContentBuilder builder, Sort sort) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java index b01db2a925839..252acfe354c5b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java @@ -11,7 +11,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.stats.IndexStats.IndexStatsBuilder; import org.elasticsearch.action.support.DefaultShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.BaseBroadcastResponse; +import org.elasticsearch.action.support.broadcast.ChunkedBroadCastResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterIndexHealth; @@ -21,9 +21,7 @@ import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.common.xcontent.ChunkedToXContent; import org.elasticsearch.index.Index; -import org.elasticsearch.rest.action.RestActions; import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentBuilder; @@ -38,7 +36,7 @@ import static java.util.Collections.unmodifiableMap; -public class IndicesStatsResponse extends BaseBroadcastResponse implements ChunkedToXContent { +public class IndicesStatsResponse extends ChunkedBroadCastResponse { private final Map indexHealthMap; @@ -176,7 +174,7 @@ public void writeTo(StreamOutput out) throws IOException { } @Override - public Iterator toXContentChunked(ToXContent.Params params) { + protected Iterator customXContentChunks(ToXContent.Params params) { final String level = params.param("level", "indices"); final boolean isLevelValid = "cluster".equalsIgnoreCase(level) || "indices".equalsIgnoreCase(level) @@ -186,7 +184,7 @@ public Iterator toXContentChunked(ToXContent.Params params } if ("indices".equalsIgnoreCase(level) || "shards".equalsIgnoreCase(level)) { return Iterators.concat(Iterators.single(((builder, p) -> { - headerAndCommonStats(builder, p); + commonStats(builder, p); return builder.startObject(Fields.INDICES); })), getIndices().values().stream().map(indexStats -> (builder, p) -> { builder.startObject(indexStats.getIndex()); @@ -219,17 +217,15 @@ public Iterator toXContentChunked(ToXContent.Params params builder.endObject(); } return builder.endObject(); - }).iterator(), Iterators.single((b, p) -> b.endObject().endObject())); + }).iterator(), Iterators.single((b, p) -> b.endObject())); } return Iterators.single((b, p) -> { - headerAndCommonStats(b, p); - return b.endObject(); + commonStats(b, p); + return b; }); } - private void headerAndCommonStats(XContentBuilder builder, ToXContent.Params p) throws IOException { - builder.startObject(); - RestActions.buildBroadcastShardsHeader(builder, p, this); + private void commonStats(XContentBuilder builder, ToXContent.Params p) throws IOException { builder.startObject("_all"); builder.startObject("primaries"); diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/ChunkedBroadCastResponse.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/ChunkedBroadCastResponse.java new file mode 100644 index 0000000000000..316b64f519955 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/ChunkedBroadCastResponse.java @@ -0,0 +1,45 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ +package org.elasticsearch.action.support.broadcast; + +import org.elasticsearch.action.support.DefaultShardOperationFailedException; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.xcontent.ChunkedToXContent; +import org.elasticsearch.rest.action.RestActions; +import org.elasticsearch.xcontent.ToXContent; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +public abstract class ChunkedBroadCastResponse extends BaseBroadcastResponse implements ChunkedToXContent { + public ChunkedBroadCastResponse(StreamInput in) throws IOException { + super(in); + } + + public ChunkedBroadCastResponse( + int totalShards, + int successfulShards, + int failedShards, + List shardFailures + ) { + super(totalShards, successfulShards, failedShards, shardFailures); + } + + @Override + public final Iterator toXContentChunked(ToXContent.Params params) { + return Iterators.concat(Iterators.single((b, p) -> { + b.startObject(); + RestActions.buildBroadcastShardsHeader(b, p, this); + return b; + }), customXContentChunks(params), Iterators.single((builder, p) -> builder.endObject())); + } + + protected abstract Iterator customXContentChunks(ToXContent.Params params); +} diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponseTests.java index 2c211a66c7b28..012b24330ed3b 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponseTests.java @@ -74,6 +74,6 @@ public void testSerializesOneChunkPerIndex() { iterator.next(); chunks++; } - assertEquals(indices + 2, chunks); + assertEquals(indices + 4, chunks); } } diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponseTests.java index b91ff9909e169..cec44f8fd0636 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponseTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponseTests.java @@ -135,7 +135,7 @@ public void testChunkedEncodingPerIndex() throws IOException { iteratorClusterLevel.next().toXContent(builder, paramsClusterLevel); chunksSeenClusterLevel++; } - assertEquals(1, chunksSeenClusterLevel); + assertEquals(3, chunksSeenClusterLevel); final ToXContent.Params paramsIndexLevel = new ToXContent.MapParams(Map.of("level", "indices")); final var iteratorIndexLevel = indicesStatsResponse.toXContentChunked(paramsIndexLevel); @@ -144,7 +144,7 @@ public void testChunkedEncodingPerIndex() throws IOException { iteratorIndexLevel.next().toXContent(builder, paramsIndexLevel); chunksSeenIndexLevel++; } - assertEquals(2 + shards, chunksSeenIndexLevel); + assertEquals(4 + shards, chunksSeenIndexLevel); } private ShardRouting createShardRouting(ShardId shardId, boolean isPrimary) { From 49b8cdd18ca7a0bbb6579f4ae8319a03def3dd90 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Thu, 24 Nov 2022 16:55:51 +0100 Subject: [PATCH 4/4] fix case --- .../admin/indices/segments/IndicesSegmentResponse.java | 4 ++-- .../action/admin/indices/stats/IndicesStatsResponse.java | 4 ++-- ...BroadCastResponse.java => ChunkedBroadcastResponse.java} | 6 +++--- 3 files changed, 7 insertions(+), 7 deletions(-) rename server/src/main/java/org/elasticsearch/action/support/broadcast/{ChunkedBroadCastResponse.java => ChunkedBroadcastResponse.java} (90%) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java index 9ae232a3e7848..5377a5af883fb 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/segments/IndicesSegmentResponse.java @@ -13,7 +13,7 @@ import org.apache.lucene.search.SortedNumericSortField; import org.apache.lucene.search.SortedSetSortField; import org.elasticsearch.action.support.DefaultShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.ChunkedBroadCastResponse; +import org.elasticsearch.action.support.broadcast.ChunkedBroadcastResponse; import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -31,7 +31,7 @@ import java.util.Locale; import java.util.Map; -public class IndicesSegmentResponse extends ChunkedBroadCastResponse { +public class IndicesSegmentResponse extends ChunkedBroadcastResponse { private final ShardSegments[] shards; diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java index 252acfe354c5b..85c28d57820bc 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/stats/IndicesStatsResponse.java @@ -11,7 +11,7 @@ import org.elasticsearch.Version; import org.elasticsearch.action.admin.indices.stats.IndexStats.IndexStatsBuilder; import org.elasticsearch.action.support.DefaultShardOperationFailedException; -import org.elasticsearch.action.support.broadcast.ChunkedBroadCastResponse; +import org.elasticsearch.action.support.broadcast.ChunkedBroadcastResponse; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.cluster.health.ClusterIndexHealth; @@ -36,7 +36,7 @@ import static java.util.Collections.unmodifiableMap; -public class IndicesStatsResponse extends ChunkedBroadCastResponse { +public class IndicesStatsResponse extends ChunkedBroadcastResponse { private final Map indexHealthMap; diff --git a/server/src/main/java/org/elasticsearch/action/support/broadcast/ChunkedBroadCastResponse.java b/server/src/main/java/org/elasticsearch/action/support/broadcast/ChunkedBroadcastResponse.java similarity index 90% rename from server/src/main/java/org/elasticsearch/action/support/broadcast/ChunkedBroadCastResponse.java rename to server/src/main/java/org/elasticsearch/action/support/broadcast/ChunkedBroadcastResponse.java index 316b64f519955..d65879578b995 100644 --- a/server/src/main/java/org/elasticsearch/action/support/broadcast/ChunkedBroadCastResponse.java +++ b/server/src/main/java/org/elasticsearch/action/support/broadcast/ChunkedBroadcastResponse.java @@ -18,12 +18,12 @@ import java.util.Iterator; import java.util.List; -public abstract class ChunkedBroadCastResponse extends BaseBroadcastResponse implements ChunkedToXContent { - public ChunkedBroadCastResponse(StreamInput in) throws IOException { +public abstract class ChunkedBroadcastResponse extends BaseBroadcastResponse implements ChunkedToXContent { + public ChunkedBroadcastResponse(StreamInput in) throws IOException { super(in); } - public ChunkedBroadCastResponse( + public ChunkedBroadcastResponse( int totalShards, int successfulShards, int failedShards,