Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add chunking to ClusterState.Custom impls #91963

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.ToXContent;

import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.function.UnaryOperator;
Expand Down Expand Up @@ -116,7 +117,7 @@ public void writeTo(StreamOutput out) {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) {
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
throw new AssertionError("task should have been cancelled before serializing this custom");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.elasticsearch.tracing.Tracer;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.xcontent.NamedXContentRegistry;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.XContentFactory;
import org.junit.Before;
Expand All @@ -50,6 +51,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -401,8 +403,8 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder;
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
return Collections.emptyIterator();
}

static NamedDiff<ClusterState.Custom> readDiffFrom(StreamInput in) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,17 @@
import org.elasticsearch.common.io.stream.VersionedNamedWriteable;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContent;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -105,7 +108,7 @@ public class ClusterState implements ToXContentFragment, Diffable<ClusterState>

public static final ClusterState EMPTY_STATE = builder(ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)).build();

public interface Custom extends NamedDiffable<Custom>, ToXContentFragment {
public interface Custom extends NamedDiffable<Custom>, ChunkedToXContent {

/**
* Returns <code>true</code> iff this {@link Custom} is private to the cluster and should never be send to a client.
Expand All @@ -121,7 +124,7 @@ default boolean isPrivate() {
* the more faithful it is the more useful it is for diagnostics.
*/
@Override
XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException;
Iterator<? extends ToXContent> toXContentChunked(Params params);
}

private static final NamedDiffableValueSerializer<Custom> CUSTOM_VALUE_SERIALIZER = new NamedDiffableValueSerializer<>(Custom.class);
Expand Down Expand Up @@ -619,7 +622,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (metrics.contains(Metric.CUSTOMS)) {
for (Map.Entry<String, Custom> cursor : customs.entrySet()) {
builder.startObject(cursor.getKey());
cursor.getValue().toXContent(builder, params);
ChunkedToXContent.wrapAsXContentObject(cursor.getValue()).toXContent(builder, params);
builder.endObject();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@

import org.elasticsearch.Version;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.repositories.RepositoryOperation;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;

public final class RepositoryCleanupInProgress extends AbstractNamedDiffable<ClusterState.Custom> implements ClusterState.Custom {
Expand Down Expand Up @@ -62,17 +64,17 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startArray(TYPE);
for (Entry entry : entries) {
builder.startObject();
{
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params ignored) {
return Iterators.concat(
Iterators.single((builder, params) -> builder.startArray(TYPE)),
entries.stream().<ToXContent>map(entry -> (builder, params) -> {
builder.startObject();
builder.field("repository", entry.repository);
}
builder.endObject();
}
builder.endArray();
return builder;
builder.endObject();
return builder;
}).iterator(),
Iterators.single((builder, params) -> builder.endArray())
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.elasticsearch.cluster.ClusterState.Custom;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Collections;
Expand Down Expand Up @@ -398,49 +398,41 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startArray("snapshots");
for (Entry entry : entries.values()) {
toXContent(entry, builder);
}
builder.endArray();
return builder;
}

/**
* Serializes single restore operation
*
* @param entry restore operation metadata
* @param builder XContent builder
*/
public static void toXContent(Entry entry, XContentBuilder builder) throws IOException {
builder.startObject();
builder.field("snapshot", entry.snapshot().getSnapshotId().getName());
builder.field("repository", entry.snapshot().getRepository());
builder.field("state", entry.state());
builder.startArray("indices");
{
for (String index : entry.indices()) {
builder.value(index);
}
}
builder.endArray();
builder.startArray("shards");
{
for (Map.Entry<ShardId, ShardRestoreStatus> shardEntry : entry.shards.entrySet()) {
ShardId shardId = shardEntry.getKey();
ShardRestoreStatus status = shardEntry.getValue();
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params ignored) {
return Iterators.concat(
Iterators.single((builder, params) -> builder.startArray("snapshots")),
entries.values().stream().<ToXContent>map(entry -> (builder, params) -> {
builder.startObject();
builder.field("snapshot", entry.snapshot().getSnapshotId().getName());
builder.field("repository", entry.snapshot().getRepository());
builder.field("state", entry.state());
builder.startArray("indices");
{
builder.field("index", shardId.getIndex());
builder.field("shard", shardId.getId());
builder.field("state", status.state());
for (String index : entry.indices()) {
builder.value(index);
}
}
builder.endArray();
builder.startArray("shards");
{
for (Map.Entry<ShardId, ShardRestoreStatus> shardEntry : entry.shards.entrySet()) {
ShardId shardId = shardEntry.getKey();
ShardRestoreStatus status = shardEntry.getValue();
builder.startObject();
{
builder.field("index", shardId.getIndex());
builder.field("shard", shardId.getId());
builder.field("state", status.state());
}
builder.endObject();
}
}
builder.endObject();
}
}

builder.endArray();
builder.endObject();
builder.endArray();
builder.endObject();
return builder;
}).iterator(),
Iterators.single((builder, params) -> builder.endArray())
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,21 @@
import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState.Custom;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.repositories.RepositoryOperation;
import org.elasticsearch.snapshots.SnapshotId;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
Expand Down Expand Up @@ -160,25 +162,27 @@ public Version getMinimalSupportedVersion() {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startArray(TYPE);
for (Entry entry : entries) {
builder.startObject();
{
builder.field("repository", entry.repository());
builder.startArray("snapshots");
for (SnapshotId snapshot : entry.snapshots) {
builder.value(snapshot.getName());
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params ignored) {
return Iterators.concat(
Iterators.single((builder, params) -> builder.startArray(TYPE)),
entries.stream().<ToXContent>map(entry -> (builder, params) -> {
builder.startObject();
{
builder.field("repository", entry.repository());
builder.startArray("snapshots");
for (SnapshotId snapshot : entry.snapshots) {
builder.value(snapshot.getName());
}
builder.endArray();
builder.timeField("start_time_millis", "start_time", entry.startTime);
builder.field("repository_state_id", entry.repositoryStateId);
builder.field("state", entry.state);
}
builder.endArray();
builder.timeField("start_time_millis", "start_time", entry.startTime);
builder.field("repository_state_id", entry.repositoryStateId);
builder.field("state", entry.state);
}
builder.endObject();
}
builder.endArray();
return builder;
builder.endObject();
return builder;
}).iterator(),
Iterators.single((builder, params) -> builder.endArray())
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.cluster.ClusterState.Custom;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -212,14 +213,12 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params params) throws IOException {
builder.startArray("snapshots");
final Iterator<Entry> iterator = asStream().iterator();
while (iterator.hasNext()) {
iterator.next().toXContent(builder, params);
}
builder.endArray();
return builder;
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params ignored) {
return Iterators.<ToXContent>concat(
Iterators.single((builder, params) -> builder.startArray("snapshots")),
asStream().iterator(),
Iterators.single((builder, params) -> builder.endArray())
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@
import org.elasticsearch.cluster.AbstractNamedDiffable;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.RelativeByteSizeValue;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentFragment;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Iterator;
import java.util.Objects;

/**
Expand Down Expand Up @@ -63,22 +66,19 @@ public static NamedDiff<ClusterState.Custom> readDiffFrom(StreamInput in) throws
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject(DISK_METADATA.getPreferredName());
diskMetadata.toXContent(builder, params);
builder.endObject();
return builder;
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params ignored) {
return Iterators.single((builder, params) -> {
builder.startObject(DISK_METADATA.getPreferredName());
diskMetadata.toXContent(builder, params);
builder.endObject();
return builder;
});
}

public static HealthMetadata getFromClusterState(ClusterState clusterState) {
return clusterState.custom(HealthMetadata.TYPE);
}

@Override
public boolean isFragment() {
return true;
}

public Disk getDiskMetadata() {
return diskMetadata;
}
Expand Down
Loading