Skip to content

Commit

Permalink
Chunked encoding for RestGetIndicesAction (#92016)
Browse files Browse the repository at this point in the history
This response scales with the number of indices requested and can reach
many MiB in size in a large cluster, let's use chunking here.

Relates #89838
  • Loading branch information
DaveCTurner authored Nov 30, 2022
1 parent c895331 commit 75de8f8
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.elasticsearch.rest.RestStatus.OK;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.nullValue;

public class Netty4HeadBodyIsEmptyIT extends ESRestTestCase {
public void testHeadRoot() throws IOException {
Expand Down Expand Up @@ -59,8 +60,8 @@ public void testDocumentExists() throws IOException {

public void testIndexExists() throws IOException {
createTestDoc();
headTestCase("/test", emptyMap(), greaterThan(0));
headTestCase("/test", singletonMap("pretty", "true"), greaterThan(0));
headTestCase("/test", emptyMap(), nullValue(Integer.class));
headTestCase("/test", singletonMap("pretty", "true"), nullValue(Integer.class));
}

public void testAliasExists() throws IOException {
Expand Down Expand Up @@ -177,7 +178,8 @@ private void headTestCase(
request.setOptions(expectWarnings(expectedWarnings));
Response response = client().performRequest(request);
assertEquals(expectedStatusCode, response.getStatusLine().getStatusCode());
assertThat(Integer.valueOf(response.getHeader("Content-Length")), matcher);
final var contentLength = response.getHeader("Content-Length");
assertThat(contentLength == null ? null : Integer.valueOf(contentLength), matcher);
assertNull("HEAD requests shouldn't have a response body but " + url + " did", response.getEntity());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestResetFeatureStateAction());
registerHandler.accept(new RestGetFeatureUpgradeStatusAction());
registerHandler.accept(new RestPostFeatureUpgradeAction());
registerHandler.accept(new RestGetIndicesAction(threadPool));
registerHandler.accept(new RestGetIndicesAction());
registerHandler.accept(new RestIndicesStatsAction());
registerHandler.accept(new RestIndicesSegmentsAction());
registerHandler.accept(new RestIndicesShardStoresAction());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -33,7 +35,7 @@
/**
* A response for a get index action.
*/
public class GetIndexResponse extends ActionResponse implements ToXContentObject {
public class GetIndexResponse extends ActionResponse implements ChunkedToXContent {

private Map<String, MappingMetadata> mappings = Map.of();
private Map<String, List<AliasMetadata>> aliases = Map.of();
Expand Down Expand Up @@ -178,59 +180,58 @@ public void writeTo(StreamOutput out) throws IOException {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
{
for (final String index : indices) {
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params ignored) {
return Iterators.concat(
Iterators.single((builder, params) -> builder.startObject()),
Arrays.stream(indices).<ToXContent>map(index -> (builder, params) -> {
builder.startObject(index);
{
builder.startObject("aliases");
List<AliasMetadata> indexAliases = aliases.get(index);
if (indexAliases != null) {
for (final AliasMetadata alias : indexAliases) {
AliasMetadata.Builder.toXContent(alias, builder, params);
}
}
builder.endObject();

MappingMetadata indexMappings = mappings.get(index);
if (indexMappings == null) {
builder.startObject("mappings").endObject();
} else {
if (builder.getRestApiVersion() == RestApiVersion.V_7
&& params.paramAsBoolean(INCLUDE_TYPE_NAME_PARAMETER, DEFAULT_INCLUDE_TYPE_NAME_POLICY)) {
builder.startObject("mappings");
builder.field(MapperService.SINGLE_MAPPING_NAME, indexMappings.sourceAsMap());
builder.endObject();
} else {
builder.field("mappings", indexMappings.sourceAsMap());
}
}

builder.startObject("settings");
Settings indexSettings = settings.get(index);
if (indexSettings != null) {
indexSettings.toXContent(builder, params);
builder.startObject("aliases");
List<AliasMetadata> indexAliases = aliases.get(index);
if (indexAliases != null) {
for (final AliasMetadata alias : indexAliases) {
AliasMetadata.Builder.toXContent(alias, builder, params);
}
builder.endObject();
}
builder.endObject();

Settings defaultIndexSettings = defaultSettings.get(index);
if (defaultIndexSettings != null && defaultIndexSettings.isEmpty() == false) {
builder.startObject("defaults");
defaultIndexSettings.toXContent(builder, params);
MappingMetadata indexMappings = mappings.get(index);
if (indexMappings == null) {
builder.startObject("mappings").endObject();
} else {
if (builder.getRestApiVersion() == RestApiVersion.V_7
&& params.paramAsBoolean(INCLUDE_TYPE_NAME_PARAMETER, DEFAULT_INCLUDE_TYPE_NAME_POLICY)) {
builder.startObject("mappings");
builder.field(MapperService.SINGLE_MAPPING_NAME, indexMappings.sourceAsMap());
builder.endObject();
} else {
builder.field("mappings", indexMappings.sourceAsMap());
}
}

String dataStream = dataStreams.get(index);
if (dataStream != null) {
builder.field("data_stream", dataStream);
}
builder.startObject("settings");
Settings indexSettings = settings.get(index);
if (indexSettings != null) {
indexSettings.toXContent(builder, params);
}
builder.endObject();
}
}
builder.endObject();
return builder;

Settings defaultIndexSettings = defaultSettings.get(index);
if (defaultIndexSettings != null && defaultIndexSettings.isEmpty() == false) {
builder.startObject("defaults");
defaultIndexSettings.toXContent(builder, params);
builder.endObject();
}

String dataStream = dataStreams.get(index);
if (dataStream != null) {
builder.field("data_stream", dataStream);
}

return builder.endObject();
}).iterator(),
Iterators.single((builder, params) -> builder.endObject())
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.DispatchingRestToXContentListener;
import org.elasticsearch.rest.action.RestCancellableNodeClient;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.rest.action.RestChunkedToXContentListener;

import java.io.IOException;
import java.util.List;
Expand All @@ -39,12 +38,6 @@ public class RestGetIndicesAction extends BaseRestHandler {

private static final Set<String> COMPATIBLE_RESPONSE_PARAMS = addToCopy(Settings.FORMAT_PARAMS, INCLUDE_TYPE_NAME_PARAMETER);

private final ThreadPool threadPool;

public RestGetIndicesAction(ThreadPool threadPool) {
this.threadPool = threadPool;
}

@Override
public List<Route> routes() {
return List.of(new Route(GET, "/{index}"), new Route(HEAD, "/{index}"));
Expand Down Expand Up @@ -76,10 +69,7 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
final var httpChannel = request.getHttpChannel();
return channel -> new RestCancellableNodeClient(client, httpChannel).admin()
.indices()
.getIndex(
getIndexRequest,
new DispatchingRestToXContentListener<>(threadPool.executor(ThreadPool.Names.MANAGEMENT), channel, request)
);
.getIndex(getIndexRequest, new RestChunkedToXContentListener<>(channel));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.RandomCreateIndexGenerator;
import org.elasticsearch.test.AbstractWireSerializingTestCase;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
Expand All @@ -27,6 +29,9 @@
import java.util.Locale;
import java.util.Map;

import static org.elasticsearch.xcontent.ToXContent.EMPTY_PARAMS;
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;

public class GetIndexResponseTests extends AbstractWireSerializingTestCase<GetIndexResponse> {

@Override
Expand Down Expand Up @@ -73,4 +78,18 @@ protected GetIndexResponse createTestInstance() {
}
return new GetIndexResponse(indices, mappings, aliases, settings, defaultSettings, dataStreams);
}

public void testChunking() throws IOException {
final var response = createTestInstance();

try (var builder = jsonBuilder()) {
int chunkCount = 0;
final var iterator = response.toXContentChunked(EMPTY_PARAMS);
while (iterator.hasNext()) {
iterator.next().toXContent(builder, ToXContent.EMPTY_PARAMS);
chunkCount += 1;
}
assertEquals(response.getIndices().length + 2, chunkCount);
} // closing the builder verifies that the XContent is well-formed
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
package org.elasticsearch.rest.action.admin.indices;

import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.common.util.concurrent.DeterministicTaskQueue;
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.test.ESTestCase;
Expand Down Expand Up @@ -37,7 +36,7 @@ public void testIncludeTypeNamesWarning() throws IOException {
Map.of("Content-Type", contentTypeHeader, "Accept", contentTypeHeader)
).withMethod(RestRequest.Method.GET).withPath("/some_index").withParams(params).build();

RestGetIndicesAction handler = new RestGetIndicesAction(new DeterministicTaskQueue().getThreadPool());
RestGetIndicesAction handler = new RestGetIndicesAction();
handler.prepareRequest(request, mock(NodeClient.class));
assertCriticalWarnings(RestGetIndicesAction.TYPES_DEPRECATION_MESSAGE);

Expand All @@ -58,7 +57,7 @@ public void testIncludeTypeNamesWarningExists() throws IOException {
Map.of("Content-Type", contentTypeHeader, "Accept", contentTypeHeader)
).withMethod(RestRequest.Method.HEAD).withPath("/some_index").withParams(params).build();

RestGetIndicesAction handler = new RestGetIndicesAction(new DeterministicTaskQueue().getThreadPool());
RestGetIndicesAction handler = new RestGetIndicesAction();
handler.prepareRequest(request, mock(NodeClient.class));
}
}

0 comments on commit 75de8f8

Please sign in to comment.