Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use chunked encoding for indices stats response #91760

Merged
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@
import org.apache.lucene.search.SortedNumericSortField;
import org.apache.lucene.search.SortedSetSortField;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BaseBroadcastResponse;
import org.elasticsearch.action.support.broadcast.ChunkedBroadcastResponse;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.rest.action.RestActions;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;

Expand All @@ -33,7 +31,7 @@
import java.util.Locale;
import java.util.Map;

public class IndicesSegmentResponse extends BaseBroadcastResponse implements ChunkedToXContent {
public class IndicesSegmentResponse extends ChunkedBroadcastResponse {

private final ShardSegments[] shards;

Expand Down Expand Up @@ -79,72 +77,72 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {
return Iterators.concat(Iterators.single(((builder, params) -> {
builder.startObject();
RestActions.buildBroadcastShardsHeader(builder, params, this);
return builder.startObject(Fields.INDICES);
})), getIndices().values().stream().map(indexSegments -> (ToXContent) (builder, params) -> {
builder.startObject(indexSegments.getIndex());

builder.startObject(Fields.SHARDS);
for (IndexShardSegments indexSegment : indexSegments) {
builder.startArray(Integer.toString(indexSegment.shardId().id()));
for (ShardSegments shardSegments : indexSegment) {
builder.startObject();

builder.startObject(Fields.ROUTING);
builder.field(Fields.STATE, shardSegments.getShardRouting().state());
builder.field(Fields.PRIMARY, shardSegments.getShardRouting().primary());
builder.field(Fields.NODE, shardSegments.getShardRouting().currentNodeId());
if (shardSegments.getShardRouting().relocatingNodeId() != null) {
builder.field(Fields.RELOCATING_NODE, shardSegments.getShardRouting().relocatingNodeId());
}
builder.endObject();

builder.field(Fields.NUM_COMMITTED_SEGMENTS, shardSegments.getNumberOfCommitted());
builder.field(Fields.NUM_SEARCH_SEGMENTS, shardSegments.getNumberOfSearch());

builder.startObject(Fields.SEGMENTS);
for (Segment segment : shardSegments) {
builder.startObject(segment.getName());
builder.field(Fields.GENERATION, segment.getGeneration());
builder.field(Fields.NUM_DOCS, segment.getNumDocs());
builder.field(Fields.DELETED_DOCS, segment.getDeletedDocs());
builder.humanReadableField(Fields.SIZE_IN_BYTES, Fields.SIZE, segment.getSize());
if (builder.getRestApiVersion() == RestApiVersion.V_7) {
builder.humanReadableField(Fields.MEMORY_IN_BYTES, Fields.MEMORY, ByteSizeValue.ZERO);
}
builder.field(Fields.COMMITTED, segment.isCommitted());
builder.field(Fields.SEARCH, segment.isSearch());
if (segment.getVersion() != null) {
builder.field(Fields.VERSION, segment.getVersion());
}
if (segment.isCompound() != null) {
builder.field(Fields.COMPOUND, segment.isCompound());
protected Iterator<ToXContent> customXContentChunks(ToXContent.Params params) {
return Iterators.concat(
Iterators.single((builder, p) -> builder.startObject(Fields.INDICES)),
getIndices().values().stream().map(indexSegments -> (ToXContent) (builder, p) -> {
builder.startObject(indexSegments.getIndex());

builder.startObject(Fields.SHARDS);
for (IndexShardSegments indexSegment : indexSegments) {
builder.startArray(Integer.toString(indexSegment.shardId().id()));
for (ShardSegments shardSegments : indexSegment) {
builder.startObject();

builder.startObject(Fields.ROUTING);
builder.field(Fields.STATE, shardSegments.getShardRouting().state());
builder.field(Fields.PRIMARY, shardSegments.getShardRouting().primary());
builder.field(Fields.NODE, shardSegments.getShardRouting().currentNodeId());
if (shardSegments.getShardRouting().relocatingNodeId() != null) {
builder.field(Fields.RELOCATING_NODE, shardSegments.getShardRouting().relocatingNodeId());
}
if (segment.getMergeId() != null) {
builder.field(Fields.MERGE_ID, segment.getMergeId());
}
if (segment.getSegmentSort() != null) {
toXContent(builder, segment.getSegmentSort());
}
if (segment.attributes != null && segment.attributes.isEmpty() == false) {
builder.field("attributes", segment.attributes);
builder.endObject();

builder.field(Fields.NUM_COMMITTED_SEGMENTS, shardSegments.getNumberOfCommitted());
builder.field(Fields.NUM_SEARCH_SEGMENTS, shardSegments.getNumberOfSearch());

builder.startObject(Fields.SEGMENTS);
for (Segment segment : shardSegments) {
builder.startObject(segment.getName());
builder.field(Fields.GENERATION, segment.getGeneration());
builder.field(Fields.NUM_DOCS, segment.getNumDocs());
builder.field(Fields.DELETED_DOCS, segment.getDeletedDocs());
builder.humanReadableField(Fields.SIZE_IN_BYTES, Fields.SIZE, segment.getSize());
if (builder.getRestApiVersion() == RestApiVersion.V_7) {
builder.humanReadableField(Fields.MEMORY_IN_BYTES, Fields.MEMORY, ByteSizeValue.ZERO);
}
builder.field(Fields.COMMITTED, segment.isCommitted());
builder.field(Fields.SEARCH, segment.isSearch());
if (segment.getVersion() != null) {
builder.field(Fields.VERSION, segment.getVersion());
}
if (segment.isCompound() != null) {
builder.field(Fields.COMPOUND, segment.isCompound());
}
if (segment.getMergeId() != null) {
builder.field(Fields.MERGE_ID, segment.getMergeId());
}
if (segment.getSegmentSort() != null) {
toXContent(builder, segment.getSegmentSort());
}
if (segment.attributes != null && segment.attributes.isEmpty() == false) {
builder.field("attributes", segment.attributes);
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();

builder.endObject();
builder.endObject();
}
builder.endArray();
}
builder.endArray();
}
builder.endObject();
builder.endObject();

builder.endObject();
return builder;
}).iterator(), Iterators.single((builder, params) -> builder.endObject().endObject()));
builder.endObject();
return builder;
}).iterator(),
Iterators.single((builder, p) -> builder.endObject())
);
}

private static void toXContent(XContentBuilder builder, Sort sort) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,23 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.stats.IndexStats.IndexStatsBuilder;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.action.support.broadcast.ChunkedBroadcastResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterIndexHealth;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.Index;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -33,7 +36,7 @@

import static java.util.Collections.unmodifiableMap;

public class IndicesStatsResponse extends BroadcastResponse {
public class IndicesStatsResponse extends ChunkedBroadcastResponse {

private final Map<String, ClusterHealthStatus> indexHealthMap;

Expand Down Expand Up @@ -171,30 +174,19 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
protected void addCustomXContentFields(XContentBuilder builder, Params params) throws IOException {
protected Iterator<ToXContent> customXContentChunks(ToXContent.Params params) {
final String level = params.param("level", "indices");
final boolean isLevelValid = "cluster".equalsIgnoreCase(level)
|| "indices".equalsIgnoreCase(level)
|| "shards".equalsIgnoreCase(level);
if (isLevelValid == false) {
throw new IllegalArgumentException("level parameter must be one of [cluster] or [indices] or [shards] but was [" + level + "]");
}

builder.startObject("_all");

builder.startObject("primaries");
getPrimaries().toXContent(builder, params);
builder.endObject();

builder.startObject("total");
getTotal().toXContent(builder, params);
builder.endObject();

builder.endObject();

if ("indices".equalsIgnoreCase(level) || "shards".equalsIgnoreCase(level)) {
builder.startObject(Fields.INDICES);
for (IndexStats indexStats : getIndices().values()) {
return Iterators.concat(Iterators.single(((builder, p) -> {
commonStats(builder, p);
return builder.startObject(Fields.INDICES);
})), getIndices().values().stream().<ToXContent>map(indexStats -> (builder, p) -> {
builder.startObject(indexStats.getIndex());
builder.field("uuid", indexStats.getUuid());
if (indexStats.getHealth() != null) {
Expand All @@ -204,11 +196,11 @@ protected void addCustomXContentFields(XContentBuilder builder, Params params) t
builder.field("status", indexStats.getState().toString().toLowerCase(Locale.ROOT));
}
builder.startObject("primaries");
indexStats.getPrimaries().toXContent(builder, params);
indexStats.getPrimaries().toXContent(builder, p);
builder.endObject();

builder.startObject("total");
indexStats.getTotal().toXContent(builder, params);
indexStats.getTotal().toXContent(builder, p);
builder.endObject();

if ("shards".equalsIgnoreCase(level)) {
Expand All @@ -217,17 +209,34 @@ protected void addCustomXContentFields(XContentBuilder builder, Params params) t
builder.startArray(Integer.toString(indexShardStats.getShardId().id()));
for (ShardStats shardStats : indexShardStats) {
builder.startObject();
shardStats.toXContent(builder, params);
shardStats.toXContent(builder, p);
builder.endObject();
}
builder.endArray();
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
return builder.endObject();
}).iterator(), Iterators.single((b, p) -> b.endObject()));
}
return Iterators.single((b, p) -> {
commonStats(b, p);
return b;
});
}

private void commonStats(XContentBuilder builder, ToXContent.Params p) throws IOException {
builder.startObject("_all");

builder.startObject("primaries");
getPrimaries().toXContent(builder, p);
builder.endObject();

builder.startObject("total");
getTotal().toXContent(builder, p);
builder.endObject();

builder.endObject();
}

static final class Fields {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.action.support.broadcast;

import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.rest.action.RestActions;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;

public abstract class ChunkedBroadcastResponse extends BaseBroadcastResponse implements ChunkedToXContent {
public ChunkedBroadcastResponse(StreamInput in) throws IOException {
super(in);
}

public ChunkedBroadcastResponse(
int totalShards,
int successfulShards,
int failedShards,
List<DefaultShardOperationFailedException> shardFailures
) {
super(totalShards, successfulShards, failedShards, shardFailures);
}

@Override
public final Iterator<ToXContent> toXContentChunked(ToXContent.Params params) {
return Iterators.concat(Iterators.single((b, p) -> {
b.startObject();
RestActions.buildBroadcastShardsHeader(b, p, this);
return b;
}), customXContentChunks(params), Iterators.single((builder, p) -> builder.endObject()));
}

protected abstract Iterator<ToXContent> customXContentChunks(ToXContent.Params params);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.rest.action.RestChunkedToXContentListener;
import org.elasticsearch.rest.action.document.RestMultiTermVectorsAction;

import java.io.IOException;
Expand Down Expand Up @@ -140,7 +140,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC

return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin()
.indices()
.stats(indicesStatsRequest, new RestToXContentListener<>(channel));
.stats(indicesStatsRequest, new RestChunkedToXContentListener<>(channel));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ public void testSerializesOneChunkPerIndex() {
iterator.next();
chunks++;
}
assertEquals(indices + 2, chunks);
assertEquals(indices + 4, chunks);
}
}
Loading