Skip to content

Commit

Permalink
Chunked Encoding for NodeStatsResponse (#90097)
Browse files Browse the repository at this point in the history
Turn this into a chunked response to some degree.
Only chunks per node for now, since deeper chunking needs
larger changes downstream that don't fit in well with the
current API.
  • Loading branch information
original-brownbear authored Sep 20, 2022
1 parent bfda66a commit 5d784d6
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,52 +40,52 @@
*/
public class NodeStats extends BaseNodeResponse implements ToXContentFragment {

private long timestamp;
private final long timestamp;

@Nullable
private NodeIndicesStats indices;

@Nullable
private OsStats os;
private final OsStats os;

@Nullable
private ProcessStats process;
private final ProcessStats process;

@Nullable
private JvmStats jvm;
private final JvmStats jvm;

@Nullable
private ThreadPoolStats threadPool;
private final ThreadPoolStats threadPool;

@Nullable
private FsInfo fs;
private final FsInfo fs;

@Nullable
private TransportStats transport;
private final TransportStats transport;

@Nullable
private HttpStats http;
private final HttpStats http;

@Nullable
private AllCircuitBreakerStats breaker;
private final AllCircuitBreakerStats breaker;

@Nullable
private ScriptStats scriptStats;
private final ScriptStats scriptStats;

@Nullable
private ScriptCacheStats scriptCacheStats;
private final ScriptCacheStats scriptCacheStats;

@Nullable
private DiscoveryStats discoveryStats;
private final DiscoveryStats discoveryStats;

@Nullable
private IngestStats ingestStats;
private final IngestStats ingestStats;

@Nullable
private AdaptiveSelectionStats adaptiveSelectionStats;
private final AdaptiveSelectionStats adaptiveSelectionStats;

@Nullable
private IndexingPressureStats indexingPressureStats;
private final IndexingPressureStats indexingPressureStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,19 @@
package org.elasticsearch.action.admin.cluster.node.stats;

import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
import org.elasticsearch.action.support.nodes.BaseNodesXContentResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;

public class NodesStatsResponse extends BaseNodesResponse<NodeStats> implements ToXContentFragment {
public class NodesStatsResponse extends BaseNodesXContentResponse<NodeStats> {

public NodesStatsResponse(StreamInput in) throws IOException {
super(in);
Expand All @@ -42,30 +42,21 @@ protected void writeNodesTo(StreamOutput out, List<NodeStats> nodes) throws IOEx
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject("nodes");
for (NodeStats nodeStats : getNodes()) {
builder.startObject(nodeStats.getNode().getId());
builder.field("timestamp", nodeStats.getTimestamp());
nodeStats.toXContent(builder, params);

builder.endObject();
}
builder.endObject();

return builder;
protected Iterator<? extends ToXContent> xContentChunks() {
return Iterators.concat(
Iterators.single((b, p) -> b.startObject("nodes")),
getNodes().stream().map(nodeStats -> (ToXContent) (b, p) -> {
b.startObject(nodeStats.getNode().getId());
b.field("timestamp", nodeStats.getTimestamp());
nodeStats.toXContent(b, p);
return b.endObject();
}).iterator(),
Iterators.single((b, p) -> b.endObject())
);
}

@Override
public String toString() {
try {
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
builder.startObject();
toXContent(builder, EMPTY_PARAMS);
builder.endObject();
return Strings.toString(builder);
} catch (IOException e) {
return "{ \"error\" : \"" + e.getMessage() + "\"}";
}
return Strings.toString(this, true, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@

public abstract class BaseNodesResponse<TNodeResponse extends BaseNodeResponse> extends ActionResponse {

private ClusterName clusterName;
private List<FailedNodeException> failures;
private List<TNodeResponse> nodes;
private final ClusterName clusterName;
private final List<FailedNodeException> failures;
private final List<TNodeResponse> nodes;
private Map<String, TNodeResponse> nodesMap;

protected BaseNodesResponse(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.support.nodes;

import org.elasticsearch.action.FailedNodeException;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.rest.action.RestActions;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;

public abstract class BaseNodesXContentResponse<TNodeResponse extends BaseNodeResponse> extends BaseNodesResponse<TNodeResponse>
implements
ChunkedToXContent {

protected BaseNodesXContentResponse(ClusterName clusterName, List<TNodeResponse> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}

protected BaseNodesXContentResponse(StreamInput in) throws IOException {
super(in);
}

@Override
public final Iterator<? extends ToXContent> toXContentChunked() {
return Iterators.concat(Iterators.single((b, p) -> {
b.startObject();
RestActions.buildNodesHeader(b, p, this);
return b.field("cluster_name", getClusterName().value());
}), xContentChunks(), Iterators.single((ToXContent) (b, p) -> b.endObject()));
}

@Override
public boolean isFragment() {
return false;
}

protected abstract Iterator<? extends ToXContent> xContentChunks();
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestActions.NodesResponseRestListener;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.rest.action.RestChunkedToXContentListener;

import java.io.IOException;
import java.util.Collections;
Expand All @@ -33,7 +33,7 @@
import static org.elasticsearch.rest.RestRequest.Method.GET;

public class RestNodesStatsAction extends BaseRestHandler {
private DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(RestNodesStatsAction.class);
private final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(RestNodesStatsAction.class);
private static final String TYPES_DEPRECATION_MESSAGE = "[types removal] " + "Specifying types in nodes stats requests is deprecated.";

@Override
Expand Down Expand Up @@ -182,7 +182,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC

return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin()
.cluster()
.nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel));
.nodesStats(nodesStatsRequest, new RestChunkedToXContentListener<>(channel));
}

private final Set<String> RESPONSE_PARAMS = Collections.singleton("level");
Expand Down

0 comments on commit 5d784d6

Please sign in to comment.