Skip to content

Commit

Permalink
Add chunking to ClusterState.Custom impls (#91963)
Browse files Browse the repository at this point in the history
Still combines the chunks together at the upper level, but this is a
step towards full chunking support for `GET _cluster/state`.

Relates #89838
  • Loading branch information
DaveCTurner authored Nov 30, 2022
1 parent da119b0 commit 015e7fb
Show file tree
Hide file tree
Showing 17 changed files with 382 additions and 222 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.ToXContent;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.function.UnaryOperator;
Expand Down Expand Up @@ -116,7 +117,7 @@ public void writeTo(StreamOutput out) {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) {
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
throw new AssertionError("task should have been cancelled before serializing this custom");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.tracing.Tracer;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.junit.Before;
Expand All @@ -50,6 +51,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -401,8 +403,8 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder;
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
return Collections.emptyIterator();
}

static NamedDiff<ClusterState.Custom> readDiffFrom(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,17 @@
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContent;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -105,7 +108,7 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>

public static final ClusterState EMPTY_STATE = builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build();

public interface Custom extends NamedDiffable<Custom>, ToXContentFragment {
public interface Custom extends NamedDiffable<Custom>, ChunkedToXContent {

/**
* Returns <code>true</code> iff this {@link Custom} is private to the cluster and should never be send to a client.
Expand All @@ -121,7 +124,7 @@ default boolean isPrivate() {
* the more faithful it is the more useful it is for diagnostics.
*/
@Override
XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException;
Iterator<? extends ToXContent> toXContentChunked(Params params);
}

private static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);
Expand Down Expand Up @@ -619,7 +622,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (metrics.contains(Metric.CUSTOMS)) {
for (Map.Entry<String, Custom> cursor : customs.entrySet()) {
builder.startObject(cursor.getKey());
cursor.getValue().toXContent(builder, params);
ChunkedToXContent.wrapAsXContentObject(cursor.getValue()).toXContent(builder, params);
builder.endObject();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@

import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.repositories.RepositoryOperation;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;

public final class RepositoryCleanupInProgress extends AbstractNamedDiffable<ClusterState.Custom> implements ClusterState.Custom {
Expand Down Expand Up @@ -62,17 +64,17 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startArray(TYPE);
for (Entry entry : entries) {
builder.startObject();
{
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params ignored) {
return Iterators.concat(
Iterators.single((builder, params) -> builder.startArray(TYPE)),
entries.stream().<ToXContent>map(entry -> (builder, params) -> {
builder.startObject();
builder.field("repository", entry.repository);
}
builder.endObject();
}
builder.endArray();
return builder;
builder.endObject();
return builder;
}).iterator(),
Iterators.single((builder, params) -> builder.endArray())
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.elasticsearch.cluster.ClusterState.Custom;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -398,49 +398,41 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startArray("snapshots");
for (Entry entry : entries.values()) {
toXContent(entry, builder);
}
builder.endArray();
return builder;
}

/**
* Serializes single restore operation
*
* @param entry restore operation metadata
* @param builder XContent builder
*/
public static void toXContent(Entry entry, XContentBuilder builder) throws IOException {
builder.startObject();
builder.field("snapshot", entry.snapshot().getSnapshotId().getName());
builder.field("repository", entry.snapshot().getRepository());
builder.field("state", entry.state());
builder.startArray("indices");
{
for (String index : entry.indices()) {
builder.value(index);
}
}
builder.endArray();
builder.startArray("shards");
{
for (Map.Entry<ShardId, ShardRestoreStatus> shardEntry : entry.shards.entrySet()) {
ShardId shardId = shardEntry.getKey();
ShardRestoreStatus status = shardEntry.getValue();
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params ignored) {
return Iterators.concat(
Iterators.single((builder, params) -> builder.startArray("snapshots")),
entries.values().stream().<ToXContent>map(entry -> (builder, params) -> {
builder.startObject();
builder.field("snapshot", entry.snapshot().getSnapshotId().getName());
builder.field("repository", entry.snapshot().getRepository());
builder.field("state", entry.state());
builder.startArray("indices");
{
builder.field("index", shardId.getIndex());
builder.field("shard", shardId.getId());
builder.field("state", status.state());
for (String index : entry.indices()) {
builder.value(index);
}
}
builder.endArray();
builder.startArray("shards");
{
for (Map.Entry<ShardId, ShardRestoreStatus> shardEntry : entry.shards.entrySet()) {
ShardId shardId = shardEntry.getKey();
ShardRestoreStatus status = shardEntry.getValue();
builder.startObject();
{
builder.field("index", shardId.getIndex());
builder.field("shard", shardId.getId());
builder.field("state", status.state());
}
builder.endObject();
}
}
builder.endObject();
}
}

builder.endArray();
builder.endObject();
builder.endArray();
builder.endObject();
return builder;
}).iterator(),
Iterators.single((builder, params) -> builder.endArray())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,21 @@
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState.Custom;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.repositories.RepositoryOperation;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -160,25 +162,27 @@ public Version getMinimalSupportedVersion() {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startArray(TYPE);
for (Entry entry : entries) {
builder.startObject();
{
builder.field("repository", entry.repository());
builder.startArray("snapshots");
for (SnapshotId snapshot : entry.snapshots) {
builder.value(snapshot.getName());
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params ignored) {
return Iterators.concat(
Iterators.single((builder, params) -> builder.startArray(TYPE)),
entries.stream().<ToXContent>map(entry -> (builder, params) -> {
builder.startObject();
{
builder.field("repository", entry.repository());
builder.startArray("snapshots");
for (SnapshotId snapshot : entry.snapshots) {
builder.value(snapshot.getName());
}
builder.endArray();
builder.timeField("start_time_millis", "start_time", entry.startTime);
builder.field("repository_state_id", entry.repositoryStateId);
builder.field("state", entry.state);
}
builder.endArray();
builder.timeField("start_time_millis", "start_time", entry.startTime);
builder.field("repository_state_id", entry.repositoryStateId);
builder.field("state", entry.state);
}
builder.endObject();
}
builder.endArray();
return builder;
builder.endObject();
return builder;
}).iterator(),
Iterators.single((builder, params) -> builder.endArray())
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.cluster.ClusterState.Custom;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -212,14 +213,12 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startArray("snapshots");
final Iterator<Entry> iterator = asStream().iterator();
while (iterator.hasNext()) {
iterator.next().toXContent(builder, params);
}
builder.endArray();
return builder;
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params ignored) {
return Iterators.<ToXContent>concat(
Iterators.single((builder, params) -> builder.startArray("snapshots")),
asStream().iterator(),
Iterators.single((builder, params) -> builder.endArray())
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@
import org.elasticsearch.cluster.AbstractNamedDiffable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.RelativeByteSizeValue;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Iterator;
import java.util.Objects;

/**
Expand Down Expand Up @@ -63,22 +66,19 @@ public static NamedDiff<ClusterState.Custom> readDiffFrom(StreamInput in) throws
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(DISK_METADATA.getPreferredName());
diskMetadata.toXContent(builder, params);
builder.endObject();
return builder;
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params ignored) {
return Iterators.single((builder, params) -> {
builder.startObject(DISK_METADATA.getPreferredName());
diskMetadata.toXContent(builder, params);
builder.endObject();
return builder;
});
}

public static HealthMetadata getFromClusterState(ClusterState clusterState) {
return clusterState.custom(HealthMetadata.TYPE);
}

@Override
public boolean isFragment() {
return true;
}

public Disk getDiskMetadata() {
return diskMetadata;
}
Expand Down
Loading

0 comments on commit 015e7fb

Please sign in to comment.