From 1c426a8233fdd569b03e7ad7ab3ce747acf95323 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 13 Sep 2022 10:15:41 +0200 Subject: [PATCH] Chunked Encoding for NodeStatsResponse Turn this into a chunked response to some degree. Only chunks per node for now, since deeper chunking needs larger changes downstream that don't fit in well with the current API. --- .../admin/cluster/node/stats/NodeStats.java | 30 +++++------ .../node/stats/NodesStatsResponse.java | 43 +++++++--------- .../support/nodes/BaseNodesResponse.java | 6 +-- .../nodes/BaseNodesXContentResponse.java | 50 +++++++++++++++++++ .../admin/cluster/RestNodesStatsAction.java | 6 +-- 5 files changed, 88 insertions(+), 47 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesXContentResponse.java diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java index 0af4ddefd5bda..0e0a660b1a553 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodeStats.java @@ -40,52 +40,52 @@ */ public class NodeStats extends BaseNodeResponse implements ToXContentFragment { - private long timestamp; + private final long timestamp; @Nullable private NodeIndicesStats indices; @Nullable - private OsStats os; + private final OsStats os; @Nullable - private ProcessStats process; + private final ProcessStats process; @Nullable - private JvmStats jvm; + private final JvmStats jvm; @Nullable - private ThreadPoolStats threadPool; + private final ThreadPoolStats threadPool; @Nullable - private FsInfo fs; + private final FsInfo fs; @Nullable - private TransportStats transport; + private final TransportStats transport; @Nullable - private HttpStats http; + private final HttpStats http; @Nullable - private AllCircuitBreakerStats breaker; + private final AllCircuitBreakerStats breaker; @Nullable - private ScriptStats scriptStats; + private final ScriptStats scriptStats; @Nullable - private ScriptCacheStats scriptCacheStats; + private final ScriptCacheStats scriptCacheStats; @Nullable - private DiscoveryStats discoveryStats; + private final DiscoveryStats discoveryStats; @Nullable - private IngestStats ingestStats; + private final IngestStats ingestStats; @Nullable - private AdaptiveSelectionStats adaptiveSelectionStats; + private final AdaptiveSelectionStats adaptiveSelectionStats; @Nullable - private IndexingPressureStats indexingPressureStats; + private final IndexingPressureStats indexingPressureStats; public NodeStats(StreamInput in) throws IOException { super(in); diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsResponse.java index 94655739b3168..ac5bcd4a3da8b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/node/stats/NodesStatsResponse.java @@ -9,19 +9,19 @@ package org.elasticsearch.action.admin.cluster.node.stats; import org.elasticsearch.action.FailedNodeException; -import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.action.support.nodes.BaseNodesXContentResponse; import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.xcontent.ToXContentFragment; -import org.elasticsearch.xcontent.XContentBuilder; -import org.elasticsearch.xcontent.XContentFactory; +import org.elasticsearch.xcontent.ToXContent; import java.io.IOException; +import java.util.Iterator; import java.util.List; -public class NodesStatsResponse extends BaseNodesResponse implements ToXContentFragment { +public class NodesStatsResponse extends BaseNodesXContentResponse { public NodesStatsResponse(StreamInput in) throws IOException { super(in); @@ -42,30 +42,21 @@ protected void writeNodesTo(StreamOutput out, List nodes) throws IOEx } @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject("nodes"); - for (NodeStats nodeStats : getNodes()) { - builder.startObject(nodeStats.getNode().getId()); - builder.field("timestamp", nodeStats.getTimestamp()); - nodeStats.toXContent(builder, params); - - builder.endObject(); - } - builder.endObject(); - - return builder; + protected Iterator xContentChunks() { + return Iterators.concat( + Iterators.single((b, p) -> b.startObject("nodes")), + getNodes().stream().map(nodeStats -> (ToXContent) (b, p) -> { + b.startObject(nodeStats.getNode().getId()); + b.field("timestamp", nodeStats.getTimestamp()); + nodeStats.toXContent(b, p); + return b.endObject(); + }).iterator(), + Iterators.single((b, p) -> b.endObject()) + ); } @Override public String toString() { - try { - XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint(); - builder.startObject(); - toXContent(builder, EMPTY_PARAMS); - builder.endObject(); - return Strings.toString(builder); - } catch (IOException e) { - return "{ \"error\" : \"" + e.getMessage() + "\"}"; - } + return Strings.toString(this, true, true); } } diff --git a/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesResponse.java b/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesResponse.java index 209853cad8f65..263ff462c9e5f 100644 --- a/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesResponse.java +++ b/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesResponse.java @@ -22,9 +22,9 @@ public abstract class BaseNodesResponse extends ActionResponse { - private ClusterName clusterName; - private List failures; - private List nodes; + private final ClusterName clusterName; + private final List failures; + private final List nodes; private Map nodesMap; protected BaseNodesResponse(StreamInput in) throws IOException { diff --git a/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesXContentResponse.java b/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesXContentResponse.java new file mode 100644 index 0000000000000..bc00f38079d03 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodesXContentResponse.java @@ -0,0 +1,50 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.support.nodes; + +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.common.collect.Iterators; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.xcontent.ChunkedToXContent; +import org.elasticsearch.rest.action.RestActions; +import org.elasticsearch.xcontent.ToXContent; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; + +public abstract class BaseNodesXContentResponse extends BaseNodesResponse + implements + ChunkedToXContent { + + protected BaseNodesXContentResponse(ClusterName clusterName, List nodes, List failures) { + super(clusterName, nodes, failures); + } + + protected BaseNodesXContentResponse(StreamInput in) throws IOException { + super(in); + } + + @Override + public final Iterator toXContentChunked() { + return Iterators.concat(Iterators.single((b, p) -> { + b.startObject(); + RestActions.buildNodesHeader(b, p, this); + return b.field("cluster_name", getClusterName().value()); + }), xContentChunks(), Iterators.single((ToXContent) (b, p) -> b.endObject())); + } + + @Override + public boolean isFragment() { + return false; + } + + protected abstract Iterator xContentChunks(); +} diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestNodesStatsAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestNodesStatsAction.java index 2fdca0675da4b..0b1b2e395715f 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestNodesStatsAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestNodesStatsAction.java @@ -17,8 +17,8 @@ import org.elasticsearch.core.RestApiVersion; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.action.RestActions.NodesResponseRestListener; import org.elasticsearch.rest.action.RestCancellableNodeClient; +import org.elasticsearch.rest.action.RestChunkedToXContentListener; import java.io.IOException; import java.util.Collections; @@ -33,7 +33,7 @@ import static org.elasticsearch.rest.RestRequest.Method.GET; public class RestNodesStatsAction extends BaseRestHandler { - private DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(RestNodesStatsAction.class); + private final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(RestNodesStatsAction.class); private static final String TYPES_DEPRECATION_MESSAGE = "[types removal] " + "Specifying types in nodes stats requests is deprecated."; @Override @@ -182,7 +182,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC return channel -> new RestCancellableNodeClient(client, request.getHttpChannel()).admin() .cluster() - .nodesStats(nodesStatsRequest, new NodesResponseRestListener<>(channel)); + .nodesStats(nodesStatsRequest, new RestChunkedToXContentListener<>(channel)); } private final Set RESPONSE_PARAMS = Collections.singleton("level");