Skip to content

Commit

Permalink
Chunked encoding for CCR APIs
Browse files Browse the repository at this point in the history
The CCR info and stats APIs can send fairly sizeable responses that
scale as `O(#shards)`. This commit makes them chunked.

Relates elastic#89838
  • Loading branch information
DaveCTurner committed Dec 22, 2022
1 parent 08515ea commit 11cc633
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@

package org.elasticsearch.xpack.ccr.rest;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.rest.action.RestChunkedToXContentListener;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;

import java.util.List;
Expand All @@ -19,6 +23,8 @@

public class RestCcrStatsAction extends BaseRestHandler {

private static final Logger logger = LogManager.getLogger(RestCcrStatsAction.class);

@Override
public List<Route> routes() {
return List.of(new Route(GET, "/_ccr/stats"));
Expand All @@ -32,7 +38,17 @@ public String getName() {
@Override
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) {
final CcrStatsAction.Request request = new CcrStatsAction.Request();
return channel -> client.execute(CcrStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
return channel -> client.execute(
CcrStatsAction.INSTANCE,
request,
new ThreadedActionListener<>(
logger,
client.threadPool(),
ThreadPool.Names.MANAGEMENT,
new RestChunkedToXContentListener<>(channel),
false
)
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.rest.action.RestChunkedToXContentListener;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction;

import java.util.List;
Expand All @@ -34,7 +34,7 @@ public String getName() {
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) {
final FollowInfoAction.Request request = new FollowInfoAction.Request();
request.setFollowerIndices(Strings.splitStringByCommaToArray(restRequest.param("index")));
return channel -> client.execute(FollowInfoAction.INSTANCE, request, new RestToXContentListener<>(channel));
return channel -> client.execute(FollowInfoAction.INSTANCE, request, new RestChunkedToXContentListener<>(channel));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@

package org.elasticsearch.xpack.ccr.rest;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.support.ThreadedActionListener;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.Strings;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.rest.action.RestChunkedToXContentListener;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;

import java.util.List;
Expand All @@ -20,6 +24,8 @@

public class RestFollowStatsAction extends BaseRestHandler {

private static final Logger logger = LogManager.getLogger(RestFollowStatsAction.class);

@Override
public List<Route> routes() {
return List.of(new Route(GET, "/{index}/_ccr/stats"));
Expand All @@ -34,7 +40,17 @@ public String getName() {
protected RestChannelConsumer prepareRequest(final RestRequest restRequest, final NodeClient client) {
final FollowStatsAction.StatsRequest request = new FollowStatsAction.StatsRequest();
request.setIndices(Strings.splitStringByCommaToArray(restRequest.param("index")));
return channel -> client.execute(FollowStatsAction.INSTANCE, request, new RestToXContentListener<>(channel));
return channel -> client.execute(
FollowStatsAction.INSTANCE,
request,
new ThreadedActionListener<>(
logger,
client.threadPool(),
ThreadPool.Names.MANAGEMENT,
new RestChunkedToXContentListener<>(channel),
false
)
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,15 @@

import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;
import org.elasticsearch.xpack.core.ccr.action.CcrStatsAction;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;

import java.io.IOException;

import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.ccr.action.AutoFollowStatsTests.randomReadExceptions;
import static org.elasticsearch.xpack.ccr.action.AutoFollowStatsTests.randomTrackingClusters;
import static org.elasticsearch.xpack.ccr.action.StatsResponsesTests.createStatsResponse;
Expand All @@ -35,4 +40,19 @@ protected CcrStatsAction.Response createTestInstance() {
FollowStatsAction.StatsResponses statsResponse = createStatsResponse();
return new CcrStatsAction.Response(autoFollowStats, statsResponse);
}

public void testChunking() throws IOException {
final var instance = createTestInstance();
int chunkCount = 0;
try (var builder = jsonBuilder()) {
final var iterator = instance.toXContentChunked(EMPTY_PARAMS);
while (iterator.hasNext()) {
iterator.next().toXContent(builder, ToXContent.EMPTY_PARAMS);
chunkCount += 1;
}
} // closing the builder verifies that the XContent is well-formed

var indexCount = instance.getFollowStats().getStatsResponses().stream().map(s -> s.status().followerIndex()).distinct().count();
assertEquals(instance.getFollowStats().getStatsResponses().size() + indexCount * 2 + 4, chunkCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@
package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractXContentSerializingTestCase;
import org.elasticsearch.test.AbstractChunkedSerializingTestCase;
import org.elasticsearch.xcontent.ConstructingObjectParser;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentParser;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction;
import org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FollowerInfo;
Expand All @@ -18,20 +19,23 @@
import java.util.ArrayList;
import java.util.List;

import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.FOLLOWER_INDICES_FIELD;
import static org.elasticsearch.xpack.core.ccr.action.FollowInfoAction.Response.Status;

public class FollowInfoResponseTests extends AbstractXContentSerializingTestCase<FollowInfoAction.Response> {
public class FollowInfoResponseTests extends AbstractChunkedSerializingTestCase<FollowInfoAction.Response> {

static final ConstructingObjectParser<FollowerInfo, Void> INFO_PARSER = new ConstructingObjectParser<>("info_parser", args -> {
return new FollowerInfo(
static final ConstructingObjectParser<FollowerInfo, Void> INFO_PARSER = new ConstructingObjectParser<>(
"info_parser",
args -> new FollowerInfo(
(String) args[0],
(String) args[1],
(String) args[2],
Status.fromString((String) args[3]),
(FollowParameters) args[4]
);
});
)
);

static {
INFO_PARSER.declareString(ConstructingObjectParser.constructorArg(), FollowerInfo.FOLLOWER_INDEX_FIELD);
Expand All @@ -48,15 +52,15 @@ public class FollowInfoResponseTests extends AbstractXContentSerializingTestCase
@SuppressWarnings("unchecked")
static final ConstructingObjectParser<FollowInfoAction.Response, Void> PARSER = new ConstructingObjectParser<>(
"response",
args -> { return new FollowInfoAction.Response((List<FollowerInfo>) args[0]); }
args -> new FollowInfoAction.Response((List<FollowerInfo>) args[0])
);

static {
PARSER.declareObjectArray(ConstructingObjectParser.constructorArg(), INFO_PARSER, FOLLOWER_INDICES_FIELD);
}

@Override
protected FollowInfoAction.Response doParseInstance(XContentParser parser) throws IOException {
protected FollowInfoAction.Response doParseInstance(XContentParser parser) {
return PARSER.apply(parser, null);
}

Expand Down Expand Up @@ -87,4 +91,18 @@ protected FollowInfoAction.Response createTestInstance() {
}
return new FollowInfoAction.Response(infos);
}

public void testChunking() throws IOException {
final var instance = createTestInstance();
int chunkCount = 0;
try (var builder = jsonBuilder()) {
final var iterator = instance.toXContentChunked(EMPTY_PARAMS);
while (iterator.hasNext()) {
iterator.next().toXContent(builder, ToXContent.EMPTY_PARAMS);
chunkCount += 1;
}
} // closing the builder verifies that the XContent is well-formed

assertEquals(instance.getFollowInfos().size() + 2, chunkCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
import org.elasticsearch.xpack.core.ccr.action.FollowStatsAction;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;

public class StatsResponsesTests extends AbstractWireSerializingTestCase<FollowStatsAction.StatsResponses> {

@Override
Expand Down Expand Up @@ -67,4 +72,19 @@ static FollowStatsAction.StatsResponses createStatsResponse() {
}
return new FollowStatsAction.StatsResponses(Collections.emptyList(), Collections.emptyList(), responses);
}

public void testChunking() throws IOException {
final var instance = createTestInstance();
int chunkCount = 0;
try (var builder = jsonBuilder()) {
final var iterator = instance.toXContentChunked(EMPTY_PARAMS);
while (iterator.hasNext()) {
iterator.next().toXContent(builder, ToXContent.EMPTY_PARAMS);
chunkCount += 1;
}
} // closing the builder verifies that the XContent is well-formed

var indexCount = instance.getStatsResponses().stream().map(s -> s.status().followerIndex()).distinct().count();
assertEquals(instance.getStatsResponses().size() + indexCount * 2 + 2, chunkCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xpack.core.ccr.AutoFollowStats;

import java.io.IOException;
import java.util.Iterator;
import java.util.Objects;

public class CcrStatsAction extends ActionType<CcrStatsAction.Response> {
Expand Down Expand Up @@ -48,7 +50,7 @@ public void writeTo(StreamOutput out) throws IOException {
}
}

public static class Response extends ActionResponse implements ToXContentObject {
public static class Response extends ActionResponse implements ChunkedToXContent {

private final AutoFollowStats autoFollowStats;
private final FollowStatsAction.StatsResponses followStats;
Expand Down Expand Up @@ -79,14 +81,14 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
builder.field("auto_follow_stats", autoFollowStats, params);
builder.field("follow_stats", followStats, params);
}
builder.endObject();
return builder;
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params outerParams) {
return Iterators.concat(
Iterators.single(
(builder, params) -> builder.startObject().field("auto_follow_stats", autoFollowStats, params).field("follow_stats")
),
followStats.toXContentChunked(outerParams),
Iterators.single((builder, params) -> builder.endObject())
);
}

@Override
Expand All @@ -101,6 +103,7 @@ public boolean equals(Object o) {
public int hashCode() {
return Objects.hash(autoFollowStats, followStats);
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,19 @@
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.MasterNodeReadRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;

Expand Down Expand Up @@ -77,7 +81,7 @@ public int hashCode() {
}
}

public static class Response extends ActionResponse implements ToXContentObject {
public static class Response extends ActionResponse implements ChunkedToXContent {

public static final ParseField FOLLOWER_INDICES_FIELD = new ParseField("follower_indices");

Expand All @@ -102,15 +106,12 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startArray(FOLLOWER_INDICES_FIELD.getPreferredName());
for (FollowerInfo followInfo : followInfos) {
followInfo.toXContent(builder, params);
}
builder.endArray();
builder.endObject();
return builder;
public Iterator<ToXContent> toXContentChunked(ToXContent.Params outerParams) {
return Iterators.concat(
Iterators.single((builder, params) -> builder.startObject().startArray(FOLLOWER_INDICES_FIELD.getPreferredName())),
followInfos.iterator(),
Iterators.single((builder, params) -> builder.endArray().endObject())
);
}

@Override
Expand Down
Loading

0 comments on commit 11cc633

Please sign in to comment.