From a88bea9d5d34638a8a094fe6e5d3f98cda808d1d Mon Sep 17 00:00:00 2001 From: David Turner Date: Thu, 24 Nov 2022 17:56:52 +0000 Subject: [PATCH] Chunked encoding for pending tasks API (#91929) This response can reach a few MiB in size in an overwhelmed cluster, let's use chunking so as not to make things worse than they already are. Relates #89838 --- .../cluster/tasks/PendingTasksBlocksIT.java | 6 +-- .../cluster/service/ClusterServiceIT.java | 4 +- .../tasks/PendingClusterTasksResponse.java | 41 +++++++--------- .../RestPendingClusterTasksAction.java | 6 ++- .../cat/RestPendingClusterTasksAction.java | 4 +- .../PendingClusterTasksResponseTests.java | 49 +++++++++++++++++++ .../elasticsearch/test/ESIntegTestCase.java | 6 ++- 7 files changed, 82 insertions(+), 34 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksResponseTests.java diff --git a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/tasks/PendingTasksBlocksIT.java b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/tasks/PendingTasksBlocksIT.java index 8b9173ed99457..568353ab2f1b9 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/tasks/PendingTasksBlocksIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/action/admin/cluster/tasks/PendingTasksBlocksIT.java @@ -38,7 +38,7 @@ public void testPendingTasksWithIndexBlocks() { try { enableIndexBlock("test", blockSetting); PendingClusterTasksResponse response = client().admin().cluster().preparePendingClusterTasks().get(); - assertNotNull(response.getPendingTasks()); + assertNotNull(response.pendingTasks()); } finally { disableIndexBlock("test", blockSetting); } @@ -54,7 +54,7 @@ public void testPendingTasksWithClusterReadOnlyBlock() { try { setClusterReadOnly(true); PendingClusterTasksResponse response = client().admin().cluster().preparePendingClusterTasks().get(); - assertNotNull(response.getPendingTasks()); + assertNotNull(response.pendingTasks()); } finally { setClusterReadOnly(false); } @@ -80,7 +80,7 @@ public boolean validateClusterForming() { } }); - assertNotNull(client().admin().cluster().preparePendingClusterTasks().get().getPendingTasks()); + assertNotNull(client().admin().cluster().preparePendingClusterTasks().get().pendingTasks()); // starting one more node allows the cluster to recover internalCluster().startNode(); diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java index 064949bf4bae5..cd2f3ebf561d5 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/service/ClusterServiceIT.java @@ -366,7 +366,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(10)); assertThat(response.pendingTasks().get(0).getSource().string(), equalTo("1")); assertThat(response.pendingTasks().get(0).isExecuting(), equalTo(true)); - for (PendingClusterTask task : response) { + for (PendingClusterTask task : response.pendingTasks()) { controlSources.remove(task.getSource().string()); } assertTrue(controlSources.isEmpty()); @@ -431,7 +431,7 @@ public void onFailure(Exception e) { response = internalCluster().coordOnlyNodeClient().admin().cluster().preparePendingClusterTasks().get(); assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(5)); controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5")); - for (PendingClusterTask task : response) { + for (PendingClusterTask task : response.pendingTasks()) { if (controlSources.remove(task.getSource().string())) { assertThat(task.getTimeInQueueInMillis(), greaterThan(0L)); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksResponse.java b/server/src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksResponse.java index 04fc4075380ab..ec9c245381e43 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksResponse.java +++ b/server/src/main/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksResponse.java @@ -10,16 +10,17 @@ import org.elasticsearch.action.ActionResponse; import org.elasticsearch.cluster.service.PendingClusterTask; +import org.elasticsearch.common.collect.Iterators; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; -import org.elasticsearch.xcontent.ToXContentObject; -import org.elasticsearch.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.ChunkedToXContent; +import org.elasticsearch.xcontent.ToXContent; import java.io.IOException; import java.util.Iterator; import java.util.List; -public class PendingClusterTasksResponse extends ActionResponse implements Iterable, ToXContentObject { +public class PendingClusterTasksResponse extends ActionResponse implements ChunkedToXContent { private final List pendingTasks; @@ -36,23 +37,11 @@ public List pendingTasks() { return pendingTasks; } - /** - * The pending cluster tasks - */ - public List getPendingTasks() { - return pendingTasks(); - } - - @Override - public Iterator iterator() { - return pendingTasks.iterator(); - } - @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("tasks: (").append(pendingTasks.size()).append("):\n"); - for (PendingClusterTask pendingClusterTask : this) { + for (PendingClusterTask pendingClusterTask : pendingTasks) { sb.append(pendingClusterTask.getInsertOrder()) .append("/") .append(pendingClusterTask.getPriority()) @@ -66,10 +55,12 @@ public String toString() { } @Override - public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.startObject(); - builder.startArray(Fields.TASKS); - for (PendingClusterTask pendingClusterTask : this) { + public Iterator toXContentChunked(ToXContent.Params params) { + return Iterators.concat(Iterators.single((builder, p) -> { + builder.startObject(); + builder.startArray(Fields.TASKS); + return builder; + }), pendingTasks.stream().map(pendingClusterTask -> (builder, p) -> { builder.startObject(); builder.field(Fields.INSERT_ORDER, pendingClusterTask.getInsertOrder()); builder.field(Fields.PRIORITY, pendingClusterTask.getPriority()); @@ -78,10 +69,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(Fields.TIME_IN_QUEUE_MILLIS, pendingClusterTask.getTimeInQueueInMillis()); builder.field(Fields.TIME_IN_QUEUE, pendingClusterTask.getTimeInQueue()); builder.endObject(); - } - builder.endArray(); - builder.endObject(); - return builder; + return builder; + }).iterator(), Iterators.single((builder, p) -> { + builder.endArray(); + builder.endObject(); + return builder; + })); } static final class Fields { diff --git a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestPendingClusterTasksAction.java b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestPendingClusterTasksAction.java index b150fe5392559..b5dc4eb50b88f 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestPendingClusterTasksAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/admin/cluster/RestPendingClusterTasksAction.java @@ -12,7 +12,7 @@ import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.rest.BaseRestHandler; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.rest.action.RestChunkedToXContentListener; import java.io.IOException; import java.util.List; @@ -36,6 +36,8 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC PendingClusterTasksRequest pendingClusterTasksRequest = new PendingClusterTasksRequest(); pendingClusterTasksRequest.masterNodeTimeout(request.paramAsTime("master_timeout", pendingClusterTasksRequest.masterNodeTimeout())); pendingClusterTasksRequest.local(request.paramAsBoolean("local", pendingClusterTasksRequest.local())); - return channel -> client.admin().cluster().pendingClusterTasks(pendingClusterTasksRequest, new RestToXContentListener<>(channel)); + return channel -> client.admin() + .cluster() + .pendingClusterTasks(pendingClusterTasksRequest, new RestChunkedToXContentListener<>(channel)); } } diff --git a/server/src/main/java/org/elasticsearch/rest/action/cat/RestPendingClusterTasksAction.java b/server/src/main/java/org/elasticsearch/rest/action/cat/RestPendingClusterTasksAction.java index 90f0254b67c7b..56c9d8f555365 100644 --- a/server/src/main/java/org/elasticsearch/rest/action/cat/RestPendingClusterTasksAction.java +++ b/server/src/main/java/org/elasticsearch/rest/action/cat/RestPendingClusterTasksAction.java @@ -66,10 +66,10 @@ protected Table getTableWithHeader(final RestRequest request) { return t; } - private Table buildTable(RestRequest request, PendingClusterTasksResponse tasks) { + private Table buildTable(RestRequest request, PendingClusterTasksResponse response) { Table t = getTableWithHeader(request); - for (PendingClusterTask task : tasks) { + for (PendingClusterTask task : response.pendingTasks()) { t.startRow(); t.addCell(task.getInsertOrder()); t.addCell(task.getTimeInQueue()); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksResponseTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksResponseTests.java new file mode 100644 index 0000000000000..9e5c8bedf756e --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/tasks/PendingClusterTasksResponseTests.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.action.admin.cluster.tasks; + +import org.elasticsearch.cluster.service.PendingClusterTask; +import org.elasticsearch.common.Priority; +import org.elasticsearch.common.text.Text; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xcontent.ToXContent; +import org.elasticsearch.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.ArrayList; + +import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder; + +public class PendingClusterTasksResponseTests extends ESTestCase { + public void testPendingClusterTasksResponseChunking() throws IOException { + final var tasks = new ArrayList(); + for (int i = between(0, 10); i > 0; i--) { + tasks.add( + new PendingClusterTask( + randomNonNegativeLong(), + randomFrom(Priority.values()), + new Text(randomAlphaOfLengthBetween(1, 10)), + randomNonNegativeLong(), + randomBoolean() + ) + ); + } + + int chunkCount = 0; + try (XContentBuilder builder = jsonBuilder()) { + final var iterator = new PendingClusterTasksResponse(tasks).toXContentChunked(ToXContent.EMPTY_PARAMS); + while (iterator.hasNext()) { + iterator.next().toXContent(builder, ToXContent.EMPTY_PARAMS); + chunkCount += 1; + } + } // closing the builder verifies that the XContent is well-formed + + assertEquals(tasks.size() + 2, chunkCount); + } +} diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index 1e1381929e4b7..68c0274d1dbc9 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -842,7 +842,11 @@ public void waitNoPendingTasksOnAll() throws Exception { ClusterHealthResponse clusterHealth = client.admin().cluster().prepareHealth().setLocal(true).get(); assertThat("client " + client + " still has in flight fetch", clusterHealth.getNumberOfInFlightFetch(), equalTo(0)); PendingClusterTasksResponse pendingTasks = client.admin().cluster().preparePendingClusterTasks().setLocal(true).get(); - assertThat("client " + client + " still has pending tasks " + pendingTasks, pendingTasks, Matchers.emptyIterable()); + assertThat( + "client " + client + " still has pending tasks " + pendingTasks, + pendingTasks.pendingTasks(), + Matchers.emptyIterable() + ); clusterHealth = client.admin().cluster().prepareHealth().setLocal(true).get(); assertThat("client " + client + " still has in flight fetch", clusterHealth.getNumberOfInFlightFetch(), equalTo(0)); }