Skip to content

Commit

Permalink
Use chunked encoding for indices stats response (#91760)
Browse files Browse the repository at this point in the history
These responses can become huge, lets chunk them by index.
  • Loading branch information
original-brownbear authored Nov 24, 2022
1 parent c66ac71 commit 7dbc1ea
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,13 @@
import org.apache.lucene.search.SortedNumericSortField;
import org.apache.lucene.search.SortedSetSortField;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BaseBroadcastResponse;
import org.elasticsearch.action.support.broadcast.ChunkedBroadcastResponse;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.index.engine.Segment;
import org.elasticsearch.rest.action.RestActions;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;

Expand All @@ -33,7 +31,7 @@
import java.util.Locale;
import java.util.Map;

public class IndicesSegmentResponse extends BaseBroadcastResponse implements ChunkedToXContent {
public class IndicesSegmentResponse extends ChunkedBroadcastResponse {

private final ShardSegments[] shards;

Expand Down Expand Up @@ -79,72 +77,72 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {
return Iterators.concat(Iterators.single(((builder, params) -> {
builder.startObject();
RestActions.buildBroadcastShardsHeader(builder, params, this);
return builder.startObject(Fields.INDICES);
})), getIndices().values().stream().map(indexSegments -> (ToXContent) (builder, params) -> {
builder.startObject(indexSegments.getIndex());

builder.startObject(Fields.SHARDS);
for (IndexShardSegments indexSegment : indexSegments) {
builder.startArray(Integer.toString(indexSegment.shardId().id()));
for (ShardSegments shardSegments : indexSegment) {
builder.startObject();

builder.startObject(Fields.ROUTING);
builder.field(Fields.STATE, shardSegments.getShardRouting().state());
builder.field(Fields.PRIMARY, shardSegments.getShardRouting().primary());
builder.field(Fields.NODE, shardSegments.getShardRouting().currentNodeId());
if (shardSegments.getShardRouting().relocatingNodeId() != null) {
builder.field(Fields.RELOCATING_NODE, shardSegments.getShardRouting().relocatingNodeId());
}
builder.endObject();

builder.field(Fields.NUM_COMMITTED_SEGMENTS, shardSegments.getNumberOfCommitted());
builder.field(Fields.NUM_SEARCH_SEGMENTS, shardSegments.getNumberOfSearch());

builder.startObject(Fields.SEGMENTS);
for (Segment segment : shardSegments) {
builder.startObject(segment.getName());
builder.field(Fields.GENERATION, segment.getGeneration());
builder.field(Fields.NUM_DOCS, segment.getNumDocs());
builder.field(Fields.DELETED_DOCS, segment.getDeletedDocs());
builder.humanReadableField(Fields.SIZE_IN_BYTES, Fields.SIZE, segment.getSize());
if (builder.getRestApiVersion() == RestApiVersion.V_7) {
builder.humanReadableField(Fields.MEMORY_IN_BYTES, Fields.MEMORY, ByteSizeValue.ZERO);
}
builder.field(Fields.COMMITTED, segment.isCommitted());
builder.field(Fields.SEARCH, segment.isSearch());
if (segment.getVersion() != null) {
builder.field(Fields.VERSION, segment.getVersion());
}
if (segment.isCompound() != null) {
builder.field(Fields.COMPOUND, segment.isCompound());
protected Iterator<ToXContent> customXContentChunks(ToXContent.Params params) {
return Iterators.concat(
Iterators.single((builder, p) -> builder.startObject(Fields.INDICES)),
getIndices().values().stream().map(indexSegments -> (ToXContent) (builder, p) -> {
builder.startObject(indexSegments.getIndex());

builder.startObject(Fields.SHARDS);
for (IndexShardSegments indexSegment : indexSegments) {
builder.startArray(Integer.toString(indexSegment.shardId().id()));
for (ShardSegments shardSegments : indexSegment) {
builder.startObject();

builder.startObject(Fields.ROUTING);
builder.field(Fields.STATE, shardSegments.getShardRouting().state());
builder.field(Fields.PRIMARY, shardSegments.getShardRouting().primary());
builder.field(Fields.NODE, shardSegments.getShardRouting().currentNodeId());
if (shardSegments.getShardRouting().relocatingNodeId() != null) {
builder.field(Fields.RELOCATING_NODE, shardSegments.getShardRouting().relocatingNodeId());
}
if (segment.getMergeId() != null) {
builder.field(Fields.MERGE_ID, segment.getMergeId());
}
if (segment.getSegmentSort() != null) {
toXContent(builder, segment.getSegmentSort());
}
if (segment.attributes != null && segment.attributes.isEmpty() == false) {
builder.field("attributes", segment.attributes);
builder.endObject();

builder.field(Fields.NUM_COMMITTED_SEGMENTS, shardSegments.getNumberOfCommitted());
builder.field(Fields.NUM_SEARCH_SEGMENTS, shardSegments.getNumberOfSearch());

builder.startObject(Fields.SEGMENTS);
for (Segment segment : shardSegments) {
builder.startObject(segment.getName());
builder.field(Fields.GENERATION, segment.getGeneration());
builder.field(Fields.NUM_DOCS, segment.getNumDocs());
builder.field(Fields.DELETED_DOCS, segment.getDeletedDocs());
builder.humanReadableField(Fields.SIZE_IN_BYTES, Fields.SIZE, segment.getSize());
if (builder.getRestApiVersion() == RestApiVersion.V_7) {
builder.humanReadableField(Fields.MEMORY_IN_BYTES, Fields.MEMORY, ByteSizeValue.ZERO);
}
builder.field(Fields.COMMITTED, segment.isCommitted());
builder.field(Fields.SEARCH, segment.isSearch());
if (segment.getVersion() != null) {
builder.field(Fields.VERSION, segment.getVersion());
}
if (segment.isCompound() != null) {
builder.field(Fields.COMPOUND, segment.isCompound());
}
if (segment.getMergeId() != null) {
builder.field(Fields.MERGE_ID, segment.getMergeId());
}
if (segment.getSegmentSort() != null) {
toXContent(builder, segment.getSegmentSort());
}
if (segment.attributes != null && segment.attributes.isEmpty() == false) {
builder.field("attributes", segment.attributes);
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();

builder.endObject();
builder.endObject();
}
builder.endArray();
}
builder.endArray();
}
builder.endObject();
builder.endObject();

builder.endObject();
return builder;
}).iterator(), Iterators.single((builder, params) -> builder.endObject().endObject()));
builder.endObject();
return builder;
}).iterator(),
Iterators.single((builder, p) -> builder.endObject())
);
}

private static void toXContent(XContentBuilder builder, Sort sort) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,23 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.indices.stats.IndexStats.IndexStatsBuilder;
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
import org.elasticsearch.action.support.broadcast.ChunkedBroadcastResponse;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterIndexHealth;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.index.Index;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand All @@ -33,7 +36,7 @@

import static java.util.Collections.unmodifiableMap;

public class IndicesStatsResponse extends BroadcastResponse {
public class IndicesStatsResponse extends ChunkedBroadcastResponse {

private final Map<String, ClusterHealthStatus> indexHealthMap;

Expand Down Expand Up @@ -171,30 +174,19 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
protected void addCustomXContentFields(XContentBuilder builder, Params params) throws IOException {
protected Iterator<ToXContent> customXContentChunks(ToXContent.Params params) {
final String level = params.param("level", "indices");
final boolean isLevelValid = "cluster".equalsIgnoreCase(level)
|| "indices".equalsIgnoreCase(level)
|| "shards".equalsIgnoreCase(level);
if (isLevelValid == false) {
throw new IllegalArgumentException("level parameter must be one of [cluster] or [indices] or [shards] but was [" + level + "]");
}

builder.startObject("_all");

builder.startObject("primaries");
getPrimaries().toXContent(builder, params);
builder.endObject();

builder.startObject("total");
getTotal().toXContent(builder, params);
builder.endObject();

builder.endObject();

if ("indices".equalsIgnoreCase(level) || "shards".equalsIgnoreCase(level)) {
builder.startObject(Fields.INDICES);
for (IndexStats indexStats : getIndices().values()) {
return Iterators.concat(Iterators.single(((builder, p) -> {
commonStats(builder, p);
return builder.startObject(Fields.INDICES);
})), getIndices().values().stream().<ToXContent>map(indexStats -> (builder, p) -> {
builder.startObject(indexStats.getIndex());
builder.field("uuid", indexStats.getUuid());
if (indexStats.getHealth() != null) {
Expand All @@ -204,11 +196,11 @@ protected void addCustomXContentFields(XContentBuilder builder, Params params) t
builder.field("status", indexStats.getState().toString().toLowerCase(Locale.ROOT));
}
builder.startObject("primaries");
indexStats.getPrimaries().toXContent(builder, params);
indexStats.getPrimaries().toXContent(builder, p);
builder.endObject();

builder.startObject("total");
indexStats.getTotal().toXContent(builder, params);
indexStats.getTotal().toXContent(builder, p);
builder.endObject();

if ("shards".equalsIgnoreCase(level)) {
Expand All @@ -217,17 +209,34 @@ protected void addCustomXContentFields(XContentBuilder builder, Params params) t
builder.startArray(Integer.toString(indexShardStats.getShardId().id()));
for (ShardStats shardStats : indexShardStats) {
builder.startObject();
shardStats.toXContent(builder, params);
shardStats.toXContent(builder, p);
builder.endObject();
}
builder.endArray();
}
builder.endObject();
}
builder.endObject();
}
builder.endObject();
return builder.endObject();
}).iterator(), Iterators.single((b, p) -> b.endObject()));
}
return Iterators.single((b, p) -> {
commonStats(b, p);
return b;
});
}

private void commonStats(XContentBuilder builder, ToXContent.Params p) throws IOException {
builder.startObject("_all");

builder.startObject("primaries");
getPrimaries().toXContent(builder, p);
builder.endObject();

builder.startObject("total");
getTotal().toXContent(builder, p);
builder.endObject();

builder.endObject();
}

static final class Fields {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/
package org.elasticsearch.action.support.broadcast;

import org.elasticsearch.action.support.DefaultShardOperationFailedException;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.rest.action.RestActions;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;

public abstract class ChunkedBroadcastResponse extends BaseBroadcastResponse implements ChunkedToXContent {
public ChunkedBroadcastResponse(StreamInput in) throws IOException {
super(in);
}

public ChunkedBroadcastResponse(
int totalShards,
int successfulShards,
int failedShards,
List<DefaultShardOperationFailedException> shardFailures
) {
super(totalShards, successfulShards, failedShards, shardFailures);
}

@Override
public final Iterator<ToXContent> toXContentChunked(ToXContent.Params params) {
return Iterators.concat(Iterators.single((b, p) -> {
b.startObject();
RestActions.buildBroadcastShardsHeader(b, p, this);
return b;
}), customXContentChunks(params), Iterators.single((builder, p) -> builder.endObject()));
}

protected abstract Iterator<ToXContent> customXContentChunks(ToXContent.Params params);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.rest.action.RestChunkedToXContentListener;
import org.elasticsearch.rest.action.document.RestMultiTermVectorsAction;

import java.io.IOException;
Expand Down Expand Up @@ -140,7 +140,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC

return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin()
.indices()
.stats(indicesStatsRequest, new RestToXContentListener<>(channel));
.stats(indicesStatsRequest, new RestChunkedToXContentListener<>(channel));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,6 @@ public void testSerializesOneChunkPerIndex() {
iterator.next();
chunks++;
}
assertEquals(indices + 2, chunks);
assertEquals(indices + 4, chunks);
}
}
Loading

0 comments on commit 7dbc1ea

Please sign in to comment.