Skip to content

Commit

Permalink
HLRC: Add support for reindex rethrottling (#33832)
Browse files Browse the repository at this point in the history
This change adds support for rethrottling reindex requests to the
RestHighLevelClient.
  • Loading branch information
Christoph Büscher authored and kcm committed Oct 30, 2018
1 parent 0fc1312 commit 74cbe0c
Show file tree
Hide file tree
Showing 9 changed files with 350 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,17 @@ static Request deleteByQuery(DeleteByQueryRequest deleteByQueryRequest) throws I
return request;
}

static Request rethrottle(RethrottleRequest rethrottleRequest) throws IOException {
String endpoint = new EndpointBuilder().addPathPart("_reindex").addPathPart(rethrottleRequest.getTaskId().toString())
.addPathPart("_rethrottle").build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
Params params = new Params(request)
.withRequestsPerSecond(rethrottleRequest.getRequestsPerSecond());
// we set "group_by" to "none" because this is the response format we can parse back
params.putParam("group_by", "none");
return request;
}

static Request putScript(PutStoredScriptRequest putStoredScriptRequest) throws IOException {
String endpoint = new EndpointBuilder().addPathPartAsIs("_scripts").addPathPart(putStoredScriptRequest.id()).build();
Request request = new Request(HttpPost.METHOD_NAME, endpoint);
Expand Down Expand Up @@ -719,11 +730,11 @@ Params withRefreshPolicy(RefreshPolicy refreshPolicy) {

Params withRequestsPerSecond(float requestsPerSecond) {
// the default in AbstractBulkByScrollRequest is Float.POSITIVE_INFINITY,
// but we don't want to add that to the URL parameters, instead we leave it out
// but we don't want to add that to the URL parameters, instead we use -1
if (Float.isFinite(requestsPerSecond)) {
return putParam("requests_per_second", Float.toString(requestsPerSecond));
return putParam(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, Float.toString(requestsPerSecond));
} else {
return putParam("requests_per_second", "-1");
return putParam(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, "-1");
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.storedscripts.DeleteStoredScriptRequest;
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptRequest;
import org.elasticsearch.action.admin.cluster.storedscripts.GetStoredScriptResponse;
Expand Down Expand Up @@ -474,13 +475,14 @@ public final BulkByScrollResponse updateByQuery(UpdateByQueryRequest updateByQue
* Asynchronously executes an update by query request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html">
* Update By Query API on elastic.co</a>
* @param updateByQueryRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
*/
public final void updateByQueryAsync(UpdateByQueryRequest reindexRequest, RequestOptions options,
public final void updateByQueryAsync(UpdateByQueryRequest updateByQueryRequest, RequestOptions options,
ActionListener<BulkByScrollResponse> listener) {
performRequestAsyncAndParseEntity(
reindexRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet()
updateByQueryRequest, RequestConverters::updateByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet()
);
}

Expand All @@ -503,16 +505,45 @@ public final BulkByScrollResponse deleteByQuery(DeleteByQueryRequest deleteByQue
* Asynchronously executes a delete by query request.
* See <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-delete-by-query.html">
* Delete By Query API on elastic.co</a>
* @param deleteByQueryRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
*/
public final void deleteByQueryAsync(DeleteByQueryRequest reindexRequest, RequestOptions options,
public final void deleteByQueryAsync(DeleteByQueryRequest deleteByQueryRequest, RequestOptions options,
ActionListener<BulkByScrollResponse> listener) {
performRequestAsyncAndParseEntity(
reindexRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet()
deleteByQueryRequest, RequestConverters::deleteByQuery, options, BulkByScrollResponse::fromXContent, listener, emptySet()
);
}

/**
* Executes a reindex rethrottling request.
* See the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#docs-reindex-rethrottle">
* Reindex rethrottling API on elastic.co</a>
* @param rethrottleRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public final ListTasksResponse reindexRethrottle(RethrottleRequest rethrottleRequest, RequestOptions options) throws IOException {
return performRequestAndParseEntity(rethrottleRequest, RequestConverters::rethrottle, options, ListTasksResponse::fromXContent,
emptySet());
}

/**
* Executes a reindex rethrottling request.
* See the <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-reindex.html#docs-reindex-rethrottle">
* Reindex rethrottling API on elastic.co</a>
* @param rethrottleRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
*/
public final void reindexRethrottleAsync(RethrottleRequest rethrottleRequest, RequestOptions options,
ActionListener<ListTasksResponse> listener) {
performRequestAsyncAndParseEntity(rethrottleRequest, RequestConverters::rethrottle, options, ListTasksResponse::fromXContent,
listener, emptySet());
}

/**
* Pings the remote Elasticsearch cluster and returns true if the ping succeeded, false otherwise
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.client;

import org.elasticsearch.tasks.TaskId;

import java.util.Objects;

/**
* A request changing throttling of a task.
*/
public class RethrottleRequest implements Validatable {

static final String REQUEST_PER_SECOND_PARAMETER = "requests_per_second";

private final TaskId taskId;
private final float requestsPerSecond;

/**
* Create a new {@link RethrottleRequest} which disables any throttling for the given taskId.
* @param taskId the task for which throttling will be disabled
*/
public RethrottleRequest(TaskId taskId) {
this.taskId = taskId;
this.requestsPerSecond = Float.POSITIVE_INFINITY;
}

/**
* Create a new {@link RethrottleRequest} which changes the throttling for the given taskId.
* @param taskId the task that throttling changes will be applied to
* @param requestsPerSecond the number of requests per second that the task should perform. This needs to be a positive value.
*/
public RethrottleRequest(TaskId taskId, float requestsPerSecond) {
Objects.requireNonNull(taskId, "taskId cannot be null");
if (requestsPerSecond <= 0) {
throw new IllegalArgumentException("requestsPerSecond needs to be positive value but was [" + requestsPerSecond+"]");
}
this.taskId = taskId;
this.requestsPerSecond = requestsPerSecond;
}

/**
* @return the task Id
*/
public TaskId getTaskId() {
return taskId;
}

/**
* @return the requests per seconds value
*/
public float getRequestsPerSecond() {
return requestsPerSecond;
}

@Override
public String toString() {
return "RethrottleRequest: taskID = " + taskId +"; reqestsPerSecond = " + requestsPerSecond;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksResponse;
import org.elasticsearch.action.admin.cluster.node.tasks.list.TaskGroup;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
Expand Down Expand Up @@ -52,22 +56,31 @@
import org.elasticsearch.index.query.IdsQueryBuilder;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.DeleteByQueryRequest;
import org.elasticsearch.index.reindex.ReindexAction;
import org.elasticsearch.index.reindex.ReindexRequest;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.tasks.RawTaskStatus;
import org.elasticsearch.tasks.TaskId;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static java.util.Collections.singletonMap;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.lessThan;

public class CrudIT extends ESRestHighLevelClientTestCase {

Expand Down Expand Up @@ -631,7 +644,7 @@ public void testBulk() throws IOException {
validateBulkResponses(nbItems, errors, bulkResponse, bulkRequest);
}

public void testReindex() throws IOException {
public void testReindex() throws Exception {
final String sourceIndex = "source1";
final String destinationIndex = "dest";
{
Expand All @@ -642,15 +655,14 @@ public void testReindex() throws IOException {
.build();
createIndex(sourceIndex, settings);
createIndex(destinationIndex, settings);
BulkRequest bulkRequest = new BulkRequest()
.add(new IndexRequest(sourceIndex, "type", "1").source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
.add(new IndexRequest(sourceIndex, "type", "2").source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON))
.setRefreshPolicy(RefreshPolicy.IMMEDIATE);
assertEquals(
RestStatus.OK,
highLevelClient().bulk(
new BulkRequest()
.add(new IndexRequest(sourceIndex, "type", "1")
.source(Collections.singletonMap("foo", "bar"), XContentType.JSON))
.add(new IndexRequest(sourceIndex, "type", "2")
.source(Collections.singletonMap("foo2", "bar2"), XContentType.JSON))
.setRefreshPolicy(RefreshPolicy.IMMEDIATE),
bulkRequest,
RequestOptions.DEFAULT
).status()
);
Expand Down Expand Up @@ -692,6 +704,72 @@ public void testReindex() throws IOException {
assertEquals(0, bulkResponse.getBulkFailures().size());
assertEquals(0, bulkResponse.getSearchFailures().size());
}
{
// test reindex rethrottling
ReindexRequest reindexRequest = new ReindexRequest();
reindexRequest.setSourceIndices(sourceIndex);
reindexRequest.setDestIndex(destinationIndex);

// this following settings are supposed to halt reindexing after first document
reindexRequest.setSourceBatchSize(1);
reindexRequest.setRequestsPerSecond(0.00001f);
final CountDownLatch reindexTaskFinished = new CountDownLatch(1);
highLevelClient().reindexAsync(reindexRequest, RequestOptions.DEFAULT, new ActionListener<BulkByScrollResponse>() {

@Override
public void onResponse(BulkByScrollResponse response) {
reindexTaskFinished.countDown();
}

@Override
public void onFailure(Exception e) {
fail(e.toString());
}
});

TaskGroup taskGroupToRethrottle = findTaskToRethrottle();
assertThat(taskGroupToRethrottle.getChildTasks(), empty());
TaskId taskIdToRethrottle = taskGroupToRethrottle.getTaskInfo().getTaskId();

float requestsPerSecond = 1000f;
ListTasksResponse response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
highLevelClient()::reindexRethrottle, highLevelClient()::reindexRethrottleAsync);
assertThat(response.getTasks(), hasSize(1));
assertEquals(taskIdToRethrottle, response.getTasks().get(0).getTaskId());
assertThat(response.getTasks().get(0).getStatus(), instanceOf(RawTaskStatus.class));
assertEquals(Float.toString(requestsPerSecond),
((RawTaskStatus) response.getTasks().get(0).getStatus()).toMap().get("requests_per_second").toString());
reindexTaskFinished.await(2, TimeUnit.SECONDS);

// any rethrottling after the reindex is done performed with the same taskId should result in a failure
response = execute(new RethrottleRequest(taskIdToRethrottle, requestsPerSecond),
highLevelClient()::reindexRethrottle, highLevelClient()::reindexRethrottleAsync);
assertTrue(response.getTasks().isEmpty());
assertFalse(response.getNodeFailures().isEmpty());
assertEquals(1, response.getNodeFailures().size());
assertEquals("Elasticsearch exception [type=resource_not_found_exception, reason=task [" + taskIdToRethrottle + "] is missing]",
response.getNodeFailures().get(0).getCause().getMessage());
}
}

private TaskGroup findTaskToRethrottle() throws IOException {
long start = System.nanoTime();
ListTasksRequest request = new ListTasksRequest();
request.setActions(ReindexAction.NAME);
request.setDetailed(true);
do {
ListTasksResponse list = highLevelClient().tasks().list(request, RequestOptions.DEFAULT);
list.rethrowFailures("Finding tasks to rethrottle");
assertThat("tasks are left over from the last execution of this test",
list.getTaskGroups(), hasSize(lessThan(2)));
if (0 == list.getTaskGroups().size()) {
// The parent task hasn't started yet
continue;
}
return list.getTaskGroups().get(0);
} while (System.nanoTime() - start < TimeUnit.SECONDS.toNanos(10));
throw new AssertionError("Couldn't find tasks to rethrottle. Here are the running tasks " +
highLevelClient().tasks().list(request, RequestOptions.DEFAULT));
}

public void testUpdateByQuery() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.elasticsearch.search.rescore.QueryRescorerBuilder;
import org.elasticsearch.search.suggest.SuggestBuilder;
import org.elasticsearch.search.suggest.completion.CompletionSuggestionBuilder;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.RandomObjects;

Expand Down Expand Up @@ -319,10 +320,10 @@ public void testReindex() throws IOException {
}
if (randomBoolean()) {
float requestsPerSecond = (float) randomDoubleBetween(0.0, 10.0, false);
expectedParams.put("requests_per_second", Float.toString(requestsPerSecond));
expectedParams.put(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, Float.toString(requestsPerSecond));
reindexRequest.setRequestsPerSecond(requestsPerSecond);
} else {
expectedParams.put("requests_per_second", "-1");
expectedParams.put(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, "-1");
}
if (randomBoolean()) {
reindexRequest.setDestRouting("=cat");
Expand Down Expand Up @@ -465,6 +466,34 @@ public void testDeleteByQuery() throws IOException {
assertToXContentBody(deleteByQueryRequest, request.getEntity());
}

public void testRethrottle() throws IOException {
TaskId taskId = new TaskId(randomAlphaOfLength(10), randomIntBetween(1, 100));
RethrottleRequest rethrottleRequest;
Float requestsPerSecond;
Map<String, String> expectedParams = new HashMap<>();
if (frequently()) {
requestsPerSecond = (float) randomDoubleBetween(0.0, 100.0, true);
rethrottleRequest = new RethrottleRequest(taskId, requestsPerSecond);
expectedParams.put(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, Float.toString(requestsPerSecond));
} else {
rethrottleRequest = new RethrottleRequest(taskId);
expectedParams.put(RethrottleRequest.REQUEST_PER_SECOND_PARAMETER, "-1");
}
expectedParams.put("group_by", "none");
Request request = RequestConverters.rethrottle(rethrottleRequest);
assertEquals("/_reindex/" + taskId + "/_rethrottle", request.getEndpoint());
assertEquals(HttpPost.METHOD_NAME, request.getMethod());
assertEquals(expectedParams, request.getParameters());
assertNull(request.getEntity());

// test illegal RethrottleRequest values
Exception e = expectThrows(NullPointerException.class, () -> new RethrottleRequest(null, 1.0f));
assertEquals("taskId cannot be null", e.getMessage());

e = expectThrows(IllegalArgumentException.class, () -> new RethrottleRequest(new TaskId("taskId", 1), -5.0f));
assertEquals("requestsPerSecond needs to be positive value but was [-5.0]", e.getMessage());
}

public void testIndex() throws IOException {
String index = randomAlphaOfLengthBetween(3, 10);
String type = randomAlphaOfLengthBetween(3, 10);
Expand Down
Loading

0 comments on commit 74cbe0c

Please sign in to comment.