Skip to content

Commit

Permalink
Chunked encoding for pending tasks API (#91929)
Browse files Browse the repository at this point in the history
This response can reach a few MiB in size in an overwhelmed cluster,
let's use chunking so as not to make things worse than they already are.

Relates #89838
  • Loading branch information
DaveCTurner authored Nov 24, 2022
1 parent 2d74bb7 commit a88bea9
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void testPendingTasksWithIndexBlocks() {
try {
enableIndexBlock("test", blockSetting);
PendingClusterTasksResponse response = client().admin().cluster().preparePendingClusterTasks().get();
assertNotNull(response.getPendingTasks());
assertNotNull(response.pendingTasks());
} finally {
disableIndexBlock("test", blockSetting);
}
Expand All @@ -54,7 +54,7 @@ public void testPendingTasksWithClusterReadOnlyBlock() {
try {
setClusterReadOnly(true);
PendingClusterTasksResponse response = client().admin().cluster().preparePendingClusterTasks().get();
assertNotNull(response.getPendingTasks());
assertNotNull(response.pendingTasks());
} finally {
setClusterReadOnly(false);
}
Expand All @@ -80,7 +80,7 @@ public boolean validateClusterForming() {
}
});

assertNotNull(client().admin().cluster().preparePendingClusterTasks().get().getPendingTasks());
assertNotNull(client().admin().cluster().preparePendingClusterTasks().get().pendingTasks());

// starting one more node allows the cluster to recover
internalCluster().startNode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(10));
assertThat(response.pendingTasks().get(0).getSource().string(), equalTo("1"));
assertThat(response.pendingTasks().get(0).isExecuting(), equalTo(true));
for (PendingClusterTask task : response) {
for (PendingClusterTask task : response.pendingTasks()) {
controlSources.remove(task.getSource().string());
}
assertTrue(controlSources.isEmpty());
Expand Down Expand Up @@ -431,7 +431,7 @@ public void onFailure(Exception e) {
response = internalCluster().coordOnlyNodeClient().admin().cluster().preparePendingClusterTasks().get();
assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(5));
controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5"));
for (PendingClusterTask task : response) {
for (PendingClusterTask task : response.pendingTasks()) {
if (controlSources.remove(task.getSource().string())) {
assertThat(task.getTimeInQueueInMillis(), greaterThan(0L));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,17 @@

import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.collect.Iterators;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.xcontent.ToXContentObject;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.ChunkedToXContent;
import org.elasticsearch.xcontent.ToXContent;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;

public class PendingClusterTasksResponse extends ActionResponse implements Iterable<PendingClusterTask>, ToXContentObject {
public class PendingClusterTasksResponse extends ActionResponse implements ChunkedToXContent {

private final List<PendingClusterTask> pendingTasks;

Expand All @@ -36,23 +37,11 @@ public List<PendingClusterTask> pendingTasks() {
return pendingTasks;
}

/**
* The pending cluster tasks
*/
public List<PendingClusterTask> getPendingTasks() {
return pendingTasks();
}

@Override
public Iterator<PendingClusterTask> iterator() {
return pendingTasks.iterator();
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("tasks: (").append(pendingTasks.size()).append("):\n");
for (PendingClusterTask pendingClusterTask : this) {
for (PendingClusterTask pendingClusterTask : pendingTasks) {
sb.append(pendingClusterTask.getInsertOrder())
.append("/")
.append(pendingClusterTask.getPriority())
Expand All @@ -66,10 +55,12 @@ public String toString() {
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.startArray(Fields.TASKS);
for (PendingClusterTask pendingClusterTask : this) {
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
return Iterators.concat(Iterators.single((builder, p) -> {
builder.startObject();
builder.startArray(Fields.TASKS);
return builder;
}), pendingTasks.stream().<ToXContent>map(pendingClusterTask -> (builder, p) -> {
builder.startObject();
builder.field(Fields.INSERT_ORDER, pendingClusterTask.getInsertOrder());
builder.field(Fields.PRIORITY, pendingClusterTask.getPriority());
Expand All @@ -78,10 +69,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(Fields.TIME_IN_QUEUE_MILLIS, pendingClusterTask.getTimeInQueueInMillis());
builder.field(Fields.TIME_IN_QUEUE, pendingClusterTask.getTimeInQueue());
builder.endObject();
}
builder.endArray();
builder.endObject();
return builder;
return builder;
}).iterator(), Iterators.single((builder, p) -> {
builder.endArray();
builder.endObject();
return builder;
}));
}

static final class Fields {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.rest.action.RestChunkedToXContentListener;

import java.io.IOException;
import java.util.List;
Expand All @@ -36,6 +36,8 @@ public RestChannelConsumer prepareRequest(final RestRequest request, final NodeC
PendingClusterTasksRequest pendingClusterTasksRequest = new PendingClusterTasksRequest();
pendingClusterTasksRequest.masterNodeTimeout(request.paramAsTime("master_timeout", pendingClusterTasksRequest.masterNodeTimeout()));
pendingClusterTasksRequest.local(request.paramAsBoolean("local", pendingClusterTasksRequest.local()));
return channel -> client.admin().cluster().pendingClusterTasks(pendingClusterTasksRequest, new RestToXContentListener<>(channel));
return channel -> client.admin()
.cluster()
.pendingClusterTasks(pendingClusterTasksRequest, new RestChunkedToXContentListener<>(channel));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ protected Table getTableWithHeader(final RestRequest request) {
return t;
}

private Table buildTable(RestRequest request, PendingClusterTasksResponse tasks) {
private Table buildTable(RestRequest request, PendingClusterTasksResponse response) {
Table t = getTableWithHeader(request);

for (PendingClusterTask task : tasks) {
for (PendingClusterTask task : response.pendingTasks()) {
t.startRow();
t.addCell(task.getInsertOrder());
t.addCell(task.getTimeInQueue());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.action.admin.cluster.tasks;

import org.elasticsearch.cluster.service.PendingClusterTask;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.text.Text;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xcontent.ToXContent;
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;

import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;

public class PendingClusterTasksResponseTests extends ESTestCase {
public void testPendingClusterTasksResponseChunking() throws IOException {
final var tasks = new ArrayList<PendingClusterTask>();
for (int i = between(0, 10); i > 0; i--) {
tasks.add(
new PendingClusterTask(
randomNonNegativeLong(),
randomFrom(Priority.values()),
new Text(randomAlphaOfLengthBetween(1, 10)),
randomNonNegativeLong(),
randomBoolean()
)
);
}

int chunkCount = 0;
try (XContentBuilder builder = jsonBuilder()) {
final var iterator = new PendingClusterTasksResponse(tasks).toXContentChunked(ToXContent.EMPTY_PARAMS);
while (iterator.hasNext()) {
iterator.next().toXContent(builder, ToXContent.EMPTY_PARAMS);
chunkCount += 1;
}
} // closing the builder verifies that the XContent is well-formed

assertEquals(tasks.size() + 2, chunkCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,11 @@ public void waitNoPendingTasksOnAll() throws Exception {
ClusterHealthResponse clusterHealth = client.admin().cluster().prepareHealth().setLocal(true).get();
assertThat("client " + client + " still has in flight fetch", clusterHealth.getNumberOfInFlightFetch(), equalTo(0));
PendingClusterTasksResponse pendingTasks = client.admin().cluster().preparePendingClusterTasks().setLocal(true).get();
assertThat("client " + client + " still has pending tasks " + pendingTasks, pendingTasks, Matchers.emptyIterable());
assertThat(
"client " + client + " still has pending tasks " + pendingTasks,
pendingTasks.pendingTasks(),
Matchers.emptyIterable()
);
clusterHealth = client.admin().cluster().prepareHealth().setLocal(true).get();
assertThat("client " + client + " still has in flight fetch", clusterHealth.getNumberOfInFlightFetch(), equalTo(0));
}
Expand Down

0 comments on commit a88bea9

Please sign in to comment.