Skip to content

Commit

Permalink
Chunked encoding for cluster reroute API
Browse files Browse the repository at this point in the history
The cluster reroute API (optionally) returns the cluster state in its
response, which can therefore be rather large. elastic#92285 enables a chunked
encoding of the cluster state, and this commit adjusts the reroute API
to make use of this encoding too.
  • Loading branch information
DaveCTurner committed Dec 31, 2022
1 parent f332fc2 commit 3dc7bda
Show file tree
Hide file tree
Showing 9 changed files with 181 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,32 @@

package org.elasticsearch.action.admin.cluster.reroute;

import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.master.IsAcknowledgeSupplier;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.allocation.RoutingExplanations;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.logging.DeprecationCategory;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.common.xcontent.ChunkedToXContentHelper;
import org.elasticsearch.common.xcontent.ChunkedToXContentObject;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;

import static org.elasticsearch.action.support.master.AcknowledgedResponse.ACKNOWLEDGED_KEY;

/**
* Response returned after a cluster reroute request
*/
public class ClusterRerouteResponse extends AcknowledgedResponse implements ToXContentObject {
public class ClusterRerouteResponse extends ActionResponse implements IsAcknowledgeSupplier, ChunkedToXContentObject {

private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(RestSearchAction.class);
public static final String STATE_FIELD_DEPRECATION_MESSAGE = "The [state] field in the response to the reroute API is deprecated "
Expand All @@ -38,15 +44,17 @@ public class ClusterRerouteResponse extends AcknowledgedResponse implements ToXC
*/
private final ClusterState state;
private final RoutingExplanations explanations;
private final boolean acknowledged;

ClusterRerouteResponse(StreamInput in) throws IOException {
super(in);
acknowledged = in.readBoolean();
state = ClusterState.readFrom(in, null);
explanations = RoutingExplanations.readFrom(in);
}

ClusterRerouteResponse(boolean acknowledged, ClusterState state, RoutingExplanations explanations) {
super(acknowledged);
this.acknowledged = acknowledged;
this.state = state;
this.explanations = explanations;
}
Expand All @@ -62,27 +70,45 @@ public RoutingExplanations getExplanations() {
return this.explanations;
}

@Override
public final boolean isAcknowledged() {
return acknowledged;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(acknowledged);
state.writeTo(out);
RoutingExplanations.writeTo(explanations, out);
}

private boolean emitState(ToXContent.Params params) {
return Objects.equals(params.param("metric"), "none") == false;
}

@Override
protected void addCustomFields(XContentBuilder builder, Params params) throws IOException {
if (Objects.equals(params.param("metric"), "none") == false) {
if (builder.getRestApiVersion() != RestApiVersion.V_7) {
deprecationLogger.critical(DeprecationCategory.API, "reroute_cluster_state", STATE_FIELD_DEPRECATION_MESSAGE);
}
builder.startObject("state");
// TODO this should be chunked, see #89838
ChunkedToXContent.wrapAsToXContent(state).toXContent(builder, params);
builder.endObject();
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {
if (emitState(outerParams)) {
deprecationLogger.critical(DeprecationCategory.API, "reroute_cluster_state", STATE_FIELD_DEPRECATION_MESSAGE);
}
return toXContentChunkedV7(outerParams);
}

if (params.paramAsBoolean("explain", false)) {
explanations.toXContent(builder, params);
}
@Override
public Iterator<? extends ToXContent> toXContentChunkedV7(ToXContent.Params outerParams) {
return Iterators.concat(
Iterators.single((builder, params) -> builder.startObject().field(ACKNOWLEDGED_KEY, isAcknowledged())),
emitState(outerParams)
? ChunkedToXContentHelper.wrapWithObject("state", state.toXContentChunked(outerParams))
: Collections.emptyIterator(),
Iterators.single((builder, params) -> {
if (params.paramAsBoolean("explain", false)) {
explanations.toXContent(builder, params);
}

builder.endObject();
return builder;
})
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/
package org.elasticsearch.action.support.master;

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.client.internal.ElasticsearchClient;
import org.elasticsearch.core.TimeValue;
Expand All @@ -16,7 +17,7 @@
*/
public abstract class AcknowledgedRequestBuilder<
Request extends AcknowledgedRequest<Request>,
Response extends AcknowledgedResponse,
Response extends ActionResponse & IsAcknowledgeSupplier,
RequestBuilder extends AcknowledgedRequestBuilder<Request, Response, RequestBuilder>> extends MasterNodeOperationRequestBuilder<
Request,
Response,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@
/**
* A response that indicates that a request has been acknowledged
*/
public class AcknowledgedResponse extends ActionResponse implements ToXContentObject {
public class AcknowledgedResponse extends ActionResponse implements IsAcknowledgeSupplier, ToXContentObject {

public static final AcknowledgedResponse TRUE = new AcknowledgedResponse(true);

public static final AcknowledgedResponse FALSE = new AcknowledgedResponse(false);

private static final ParseField ACKNOWLEDGED = new ParseField("acknowledged");
public static final String ACKNOWLEDGED_KEY = "acknowledged";
private static final ParseField ACKNOWLEDGED = new ParseField(ACKNOWLEDGED_KEY);

protected static <T extends AcknowledgedResponse> void declareAcknowledgedField(ConstructingObjectParser<T, Void> objectParser) {
objectParser.declareField(
Expand Down Expand Up @@ -65,6 +66,7 @@ protected AcknowledgedResponse(boolean acknowledged) {
* Returns whether the response is acknowledged or not
* @return true if the response is acknowledged, false otherwise
*/
@Override
public final boolean isAcknowledged() {
return acknowledged;
}
Expand All @@ -77,7 +79,7 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public final XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(ACKNOWLEDGED.getPreferredName(), isAcknowledged());
builder.field(ACKNOWLEDGED_KEY, isAcknowledged());
addCustomFields(builder, params);
builder.endObject();
return builder;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.support.master;

public interface IsAcknowledgeSupplier {
boolean isAcknowledged();
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.elasticsearch.common.xcontent;

import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
Expand All @@ -29,6 +30,14 @@ public interface ChunkedToXContent {
*/
Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params);

/**
* Similar to {@link #toXContentChunked} but for the {@link RestApiVersion#V_7} API. Note that chunked response bodies cannot send
* deprecation warning headers once transmission has started, so implementations must check for deprecated feature use before returning.
*/
default Iterator<? extends ToXContent> toXContentChunkedV7(ToXContent.Params params) {
return toXContentChunked(params);
}

/**
* Wraps the given instance in a {@link ToXContent} that will fully serialize the instance when serialized.
* @param chunkedToXContent instance to wrap
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.core.Streams;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;
Expand Down Expand Up @@ -82,7 +83,9 @@ public void write(byte[] b, int off, int len) throws IOException {
Streams.noCloseStream(out)
);

private final Iterator<? extends ToXContent> serialization = chunkedToXContent.toXContentChunked(params);
private final Iterator<? extends ToXContent> serialization = builder.getRestApiVersion() == RestApiVersion.V_7
? chunkedToXContent.toXContentChunkedV7(params)
: chunkedToXContent.toXContentChunked(params);

private BytesStream target;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.elasticsearch.common.settings.SettingsFilter;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.rest.action.RestChunkedToXContentListener;
import org.elasticsearch.xcontent.ObjectParser;
import org.elasticsearch.xcontent.ObjectParser.ValueType;
import org.elasticsearch.xcontent.ParseField;
Expand Down Expand Up @@ -82,7 +82,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
if (metric == null) {
request.params().put("metric", DEFAULT_METRICS);
}
return channel -> client.admin().cluster().reroute(clusterRerouteRequest, new RestToXContentListener<>(channel));
return channel -> client.admin().cluster().reroute(clusterRerouteRequest, new RestChunkedToXContentListener<>(channel));
}

@Override
Expand Down
Loading

0 comments on commit 3dc7bda

Please sign in to comment.