Skip to content

Commit

Permalink
Chunked encoding for cluster state API (elastic#92285)
Browse files Browse the repository at this point in the history
  • Loading branch information
DaveCTurner authored Dec 30, 2022
1 parent 3fbb109 commit f332fc2
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 197 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.elasticsearch.rest.AbstractRestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestResponseUtils;
import org.elasticsearch.rest.action.admin.cluster.RestClusterStateAction;
import org.elasticsearch.rest.action.admin.cluster.RestGetRepositoriesAction;
import org.elasticsearch.snapshots.mockstore.MockRepository;
Expand Down Expand Up @@ -525,8 +526,9 @@ public void sendResponse(RestResponse response) {
@Override
public void sendResponse(RestResponse response) {
try {
assertThat(response.content().utf8ToString(), containsString("notsecretusername"));
assertThat(response.content().utf8ToString(), not(containsString("verysecretpassword")));
final var responseBody = RestResponseUtils.getBodyContent(response).utf8ToString();
assertThat(responseBody, containsString("notsecretusername"));
assertThat(responseBody, not(containsString("verysecretpassword")));
} catch (AssertionError ex) {
clusterStateError.set(ex);
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.xcontent.ToXContentObject;
Expand Down Expand Up @@ -75,7 +76,8 @@ protected void addCustomFields(XContentBuilder builder, Params params) throws IO
deprecationLogger.critical(DeprecationCategory.API, "reroute_cluster_state", STATE_FIELD_DEPRECATION_MESSAGE);
}
builder.startObject("state");
state.toXContent(builder, params);
// TODO this should be chunked, see #89838
ChunkedToXContent.wrapAsToXContent(state).toXContent(builder, params);
builder.endObject();
}

Expand Down
222 changes: 119 additions & 103 deletions server/src/main/java/org/elasticsearch/cluster/ClusterState.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,9 @@
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.RoutingNode;
import org.elasticsearch.cluster.routing.RoutingNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterApplierService;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.cluster.service.MasterService;
Expand All @@ -35,6 +32,7 @@
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
Expand All @@ -44,13 +42,13 @@
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContent;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -59,6 +57,7 @@
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.function.Function;

import static org.elasticsearch.gateway.GatewayService.STATE_NOT_RECOVERED_BLOCK;

Expand Down Expand Up @@ -95,7 +94,7 @@
* <p>
* Cluster state updates can be used to trigger various actions via a {@link ClusterStateListener} rather than using a timer.
* <p>
* Implements {@link ToXContentFragment} to be exposed in REST APIs (e.g. {@code GET _cluster/state} and {@code POST _cluster/reroute}) and
* Implements {@link ChunkedToXContent} to be exposed in REST APIs (e.g. {@code GET _cluster/state} and {@code POST _cluster/reroute}) and
* to be indexed by monitoring, mostly just for diagnostics purposes. The {@link XContent} representation does not need to be 100% faithful
* since we never reconstruct a cluster state from its XContent representation, but the more faithful it is the more useful it is for
* diagnostics. Note that the {@link XContent} representation of the {@link Metadata} portion does have to be faithful (in {@link
Expand All @@ -104,7 +103,7 @@
* Security-sensitive data such as passwords or private keys should not be stored in the cluster state, since the contents of the cluster
* state are exposed in various APIs.
*/
public class ClusterState implements ToXContentFragment, Diffable<ClusterState> {
public class ClusterState implements ChunkedToXContent, Diffable<ClusterState> {

public static final ClusterState EMPTY_STATE = builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build();

Expand All @@ -124,7 +123,7 @@ default boolean isPrivate() {
* the more faithful it is the more useful it is for diagnostics.
*/
@Override
Iterator<? extends ToXContent> toXContentChunked(Params params);
Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params);
}

private static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);
Expand Down Expand Up @@ -520,114 +519,131 @@ public String toString() {
}
}

private static <T> Iterator<ToXContent> chunkedSection(
boolean condition,
ToXContent before,
Iterator<T> items,
Function<T, Iterator<ToXContent>> fn,
ToXContent after
) {
return condition
? Iterators.concat(Iterators.single(before), Iterators.flatMap(items, fn::apply), Iterators.single(after))
: Collections.emptyIterator();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
EnumSet<Metric> metrics = Metric.parseString(params.param("metric", "_all"), true);
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {
final var metrics = Metric.parseString(outerParams.param("metric", "_all"), true);

// always provide the cluster_uuid as part of the top-level response (also part of the metadata response)
builder.field("cluster_uuid", metadata().clusterUUID());
return Iterators.concat(

if (metrics.contains(Metric.VERSION)) {
builder.field("version", version);
builder.field("state_uuid", stateUUID);
}
// header chunk
Iterators.single(((builder, params) -> {
// always provide the cluster_uuid as part of the top-level response (also part of the metadata response)
builder.field("cluster_uuid", metadata().clusterUUID());

if (metrics.contains(Metric.MASTER_NODE)) {
builder.field("master_node", nodes().getMasterNodeId());
}

if (metrics.contains(Metric.BLOCKS)) {
builder.startObject("blocks");
// state version info
if (metrics.contains(Metric.VERSION)) {
builder.field("version", version);
builder.field("state_uuid", stateUUID);
}

if (blocks().global().isEmpty() == false) {
builder.startObject("global");
for (ClusterBlock block : blocks().global()) {
block.toXContent(builder, params);
// master node
if (metrics.contains(Metric.MASTER_NODE)) {
builder.field("master_node", nodes().getMasterNodeId());
}
builder.endObject();
}

if (blocks().indices().isEmpty() == false) {
builder.startObject("indices");
for (Map.Entry<String, Set<ClusterBlock>> entry : blocks().indices().entrySet()) {
builder.startObject(entry.getKey());
for (ClusterBlock block : entry.getValue()) {
return builder;
})),

// blocks
chunkedSection(metrics.contains(Metric.BLOCKS), (builder, params) -> {
builder.startObject("blocks");
if (blocks().global().isEmpty() == false) {
builder.startObject("global");
for (ClusterBlock block : blocks().global()) {
block.toXContent(builder, params);
}
builder.endObject();
}
builder.endObject();
}

builder.endObject();
}

// nodes
if (metrics.contains(Metric.NODES)) {
builder.startObject("nodes");
for (DiscoveryNode node : nodes) {
node.toXContent(builder, params);
}
builder.endObject();
}

// meta data
if (metrics.contains(Metric.METADATA)) {
ChunkedToXContent.wrapAsToXContent(metadata).toXContent(builder, params);
}

// routing table
if (metrics.contains(Metric.ROUTING_TABLE)) {
builder.startObject("routing_table");
builder.startObject("indices");
for (IndexRoutingTable indexRoutingTable : routingTable()) {
builder.startObject(indexRoutingTable.getIndex().getName());
builder.startObject("shards");
for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) {
IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardId);
builder.startArray(Integer.toString(indexShardRoutingTable.shardId().id()));
for (int copy = 0; copy < indexShardRoutingTable.size(); copy++) {
indexShardRoutingTable.shard(copy).toXContent(builder, params);
}
builder.endArray();
if (blocks().indices().isEmpty() == false) {
builder.startObject("indices");
}
builder.endObject();
builder.endObject();
}
builder.endObject();
builder.endObject();
}

// routing nodes
if (metrics.contains(Metric.ROUTING_NODES)) {
builder.startObject("routing_nodes");
builder.startArray("unassigned");
for (ShardRouting shardRouting : getRoutingNodes().unassigned()) {
shardRouting.toXContent(builder, params);
}
builder.endArray();

builder.startObject("nodes");
for (RoutingNode routingNode : getRoutingNodes()) {
builder.startArray(routingNode.nodeId() == null ? "null" : routingNode.nodeId());
for (ShardRouting shardRouting : routingNode) {
shardRouting.toXContent(builder, params);
return builder;
}, blocks.indices().entrySet().iterator(), entry -> Iterators.single((builder, params) -> {
builder.startObject(entry.getKey());
for (ClusterBlock block : entry.getValue()) {
block.toXContent(builder, params);
}
builder.endArray();
}
builder.endObject();

builder.endObject();
}
if (metrics.contains(Metric.CUSTOMS)) {
for (Map.Entry<String, Custom> cursor : customs.entrySet()) {
builder.startObject(cursor.getKey());
ChunkedToXContent.wrapAsToXContent(cursor.getValue()).toXContent(builder, params);
builder.endObject();
}
}

return builder;
return builder.endObject();
}), (builder, params) -> {
if (blocks().indices().isEmpty() == false) {
builder.endObject();
}
return builder.endObject();
}),

// nodes
chunkedSection(
metrics.contains(Metric.NODES),
(builder, params) -> builder.startObject("nodes"),
nodes.iterator(),
Iterators::single,
(builder, params) -> builder.endObject()
),

// metadata
metrics.contains(Metric.METADATA) ? metadata.toXContentChunked(outerParams) : Collections.emptyIterator(),

// routing table
chunkedSection(
metrics.contains(Metric.ROUTING_TABLE),
(builder, params) -> builder.startObject("routing_table").startObject("indices"),
routingTable().iterator(),
indexRoutingTable -> Iterators.single((builder, params) -> {
builder.startObject(indexRoutingTable.getIndex().getName());
builder.startObject("shards");
for (int shardId = 0; shardId < indexRoutingTable.size(); shardId++) {
IndexShardRoutingTable indexShardRoutingTable = indexRoutingTable.shard(shardId);
builder.startArray(Integer.toString(indexShardRoutingTable.shardId().id()));
for (int copy = 0; copy < indexShardRoutingTable.size(); copy++) {
indexShardRoutingTable.shard(copy).toXContent(builder, params);
}
builder.endArray();
}
return builder.endObject().endObject();
}),
(builder, params) -> builder.endObject().endObject()
),

// routing nodes
chunkedSection(
metrics.contains(Metric.ROUTING_NODES),
(builder, params) -> builder.startObject("routing_nodes").startArray("unassigned"),
getRoutingNodes().unassigned().iterator(),
Iterators::single,
(builder, params) -> builder.endArray() // no endObject() here, continued in next chunkedSection()
),
chunkedSection(
metrics.contains(Metric.ROUTING_NODES),
(builder, params) -> builder.startObject("nodes"),
getRoutingNodes().iterator(),
routingNode -> Iterators.concat(
ChunkedToXContentHelper.startArray(routingNode.nodeId() == null ? "null" : routingNode.nodeId()),
routingNode.iterator(),
ChunkedToXContentHelper.endArray()
),
(builder, params) -> builder.endObject().endObject()
),

// customs
metrics.contains(Metric.CUSTOMS)
? Iterators.flatMap(
customs.entrySet().iterator(),
cursor -> ChunkedToXContentHelper.wrapWithObject(cursor.getKey(), cursor.getValue().toXContentChunked(outerParams))
)
: Collections.emptyIterator()
);
}

public static Builder builder(ClusterName clusterName) {
Expand Down

This file was deleted.

Loading

0 comments on commit f332fc2

Please sign in to comment.