Skip to content

Commit

Permalink
Add support for cancelling async requests in low-level REST client (e…
Browse files Browse the repository at this point in the history
…lastic#45379)

The low-level REST client exposes a `performRequestAsync` method that
allows to send async requests, but today it does not expose the ability
to cancel such requests. That is something that the underlying apache
async http client supports, and it makes sense for us to expose.

This commit adds a return value to the `performRequestAsync` method,
which is backwards compatible. A `Cancellable` object gets returned,
which exposes a `cancel` public method. When calling `cancel`, the
on-going request associated with the returned `Cancellable` instance
will be cancelled by calling its `abort` method. This works throughout
multiple retries, though some special care was needed for the case where
`cancel` is called between different attempts (when one attempt has
failed and the consecutive one has not been sent yet).

Note that cancelling a request on the client side does not automatically 
translate to cancelling the server side execution of it. That needs to be 
specifically implemented, which is on the work for the search API (see elastic#43332).

Relates to elastic#44802
  • Loading branch information
javanna committed Sep 11, 2019
1 parent 80bb08f commit 5d16d35
Show file tree
Hide file tree
Showing 8 changed files with 366 additions and 86 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;

import org.apache.http.client.methods.AbstractExecutionAwareRequest;
import org.apache.http.client.methods.HttpRequestBase;

import java.util.concurrent.CancellationException;

/**
* Represents an operation that can be cancelled.
* Returned when executing async requests through {@link RestClient#performRequestAsync(Request, ResponseListener)}, so that the request
* can be cancelled if needed. Cancelling a request will result in calling {@link AbstractExecutionAwareRequest#abort()} on the underlying
* request object, which will in turn cancel its corresponding {@link java.util.concurrent.Future}.
* Note that cancelling a request does not automatically translate to aborting its execution on the server side, which needs to be
* specifically implemented in each API.
*/
public class Cancellable {

static final Cancellable NO_OP = new Cancellable(null) {
@Override
public void cancel() {
}

@Override
void runIfNotCancelled(Runnable runnable) {
throw new UnsupportedOperationException();
}
};

static Cancellable fromRequest(HttpRequestBase httpRequest) {
return new Cancellable(httpRequest);
}

private final HttpRequestBase httpRequest;

private Cancellable(HttpRequestBase httpRequest) {
this.httpRequest = httpRequest;
}

/**
* Cancels the on-going request that is associated with the current instance of {@link Cancellable}.
*
*/
public synchronized void cancel() {
this.httpRequest.abort();
}

/**
* Executes some arbitrary code iff the on-going request has not been cancelled, otherwise throws {@link CancellationException}.
* This is needed to guarantee that cancelling a request works correctly even in case {@link #cancel()} is called between different
* attempts of the same request. The low-level client reuses the same instance of the {@link AbstractExecutionAwareRequest} by calling
* {@link AbstractExecutionAwareRequest#reset()} between subsequent retries. The {@link #cancel()} method can be called at anytime,
* and we need to handle the case where it gets called while there is no request being executed as one attempt may have failed and
* the subsequent attempt has not been started yet.
* If the request has already been cancelled we don't go ahead with the next attempt, and artificially raise the
* {@link CancellationException}, otherwise we run the provided {@link Runnable} which will reset the request and send the next attempt.
* Note that this method must be synchronized as well as the {@link #cancel()} method, to prevent a request from being cancelled
* when there is no future to cancel, which would make cancelling the request a no-op.
*/
synchronized void runIfNotCancelled(Runnable runnable) {
if (this.httpRequest.isAborted()) {
throw newCancellationException();
}
runnable.run();
}

static CancellationException newCancellationException() {
return new CancellationException("request was cancelled");
}
}
77 changes: 41 additions & 36 deletions client/rest/src/main/java/org/elasticsearch/client/RestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -277,60 +277,64 @@ private ResponseOrResponseException convertResponse(InternalRequest request, Nod
* @param responseListener the {@link ResponseListener} to notify when the
* request is completed or fails
*/
public void performRequestAsync(Request request, ResponseListener responseListener) {
public Cancellable performRequestAsync(Request request, ResponseListener responseListener) {
try {
FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(responseListener);
InternalRequest internalRequest = new InternalRequest(request);
performRequestAsync(nextNodes(), internalRequest, failureTrackingResponseListener);
return internalRequest.cancellable;
} catch (Exception e) {
responseListener.onFailure(e);
return Cancellable.NO_OP;
}
}

private void performRequestAsync(final NodeTuple<Iterator<Node>> nodeTuple,
final InternalRequest request,
final FailureTrackingResponseListener listener) {
final RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache);
client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse httpResponse) {
try {
ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse);
if (responseOrResponseException.responseException == null) {
listener.onSuccess(responseOrResponseException.response);
} else {
if (nodeTuple.nodes.hasNext()) {
listener.trackFailure(responseOrResponseException.responseException);
performRequestAsync(nodeTuple, request, listener);
request.cancellable.runIfNotCancelled(() -> {
final RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache);
client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse httpResponse) {
try {
ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse);
if (responseOrResponseException.responseException == null) {
listener.onSuccess(responseOrResponseException.response);
} else {
listener.onDefinitiveFailure(responseOrResponseException.responseException);
if (nodeTuple.nodes.hasNext()) {
listener.trackFailure(responseOrResponseException.responseException);
performRequestAsync(nodeTuple, request, listener);
} else {
listener.onDefinitiveFailure(responseOrResponseException.responseException);
}
}
} catch(Exception e) {
listener.onDefinitiveFailure(e);
}
} catch(Exception e) {
listener.onDefinitiveFailure(e);
}
}

@Override
public void failed(Exception failure) {
try {
RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, failure);
onFailure(context.node);
if (nodeTuple.nodes.hasNext()) {
listener.trackFailure(failure);
performRequestAsync(nodeTuple, request, listener);
} else {
listener.onDefinitiveFailure(failure);
@Override
public void failed(Exception failure) {
try {
RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, failure);
onFailure(context.node);
if (nodeTuple.nodes.hasNext()) {
listener.trackFailure(failure);
performRequestAsync(nodeTuple, request, listener);
} else {
listener.onDefinitiveFailure(failure);
}
} catch(Exception e) {
listener.onDefinitiveFailure(e);
}
} catch(Exception e) {
listener.onDefinitiveFailure(e);
}
}

@Override
public void cancelled() {
listener.onDefinitiveFailure(new ExecutionException("request was cancelled", null));
}
@Override
public void cancelled() {
listener.onDefinitiveFailure(Cancellable.newCancellationException());
}
});
});
}

Expand Down Expand Up @@ -651,19 +655,20 @@ public void remove() {

private class InternalRequest {
private final Request request;
private final Map<String, String> params;
private final Set<Integer> ignoreErrorCodes;
private final HttpRequestBase httpRequest;
private final Cancellable cancellable;
private final WarningsHandler warningsHandler;

InternalRequest(Request request) {
this.request = request;
this.params = new HashMap<>(request.getParameters());
Map<String, String> params = new HashMap<>(request.getParameters());
//ignore is a special parameter supported by the clients, shouldn't be sent to es
String ignoreString = params.remove("ignore");
this.ignoreErrorCodes = getIgnoreErrorCodes(ignoreString, request.getMethod());
URI uri = buildUri(pathPrefix, request.getEndpoint(), params);
this.httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity());
this.cancellable = Cancellable.fromRequest(httpRequest);
setHeaders(httpRequest, request.getOptions().getHeaders());
this.warningsHandler = request.getOptions().getWarningsHandler() == null ?
RestClient.this.warningsHandler : request.getOptions().getWarningsHandler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,17 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.elasticsearch.client.RestClientTestUtil.getAllStatusCodes;
import static org.elasticsearch.client.RestClientTestUtil.randomErrorNoRetryStatusCode;
import static org.elasticsearch.client.RestClientTestUtil.randomOkStatusCode;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

Expand All @@ -52,6 +55,7 @@
*/
public class RestClientMultipleHostsIntegTests extends RestClientTestCase {

private static WaitForCancelHandler waitForCancelHandler;
private static HttpServer[] httpServers;
private static HttpHost[] httpHosts;
private static boolean stoppedFirstHost = false;
Expand Down Expand Up @@ -94,9 +98,34 @@ private static HttpServer createHttpServer() throws Exception {
for (int statusCode : getAllStatusCodes()) {
httpServer.createContext(pathPrefix + "/" + statusCode, new ResponseHandler(statusCode));
}
waitForCancelHandler = new WaitForCancelHandler();
httpServer.createContext(pathPrefix + "/wait", waitForCancelHandler);
return httpServer;
}

private static class WaitForCancelHandler implements HttpHandler {
private CountDownLatch cancelHandlerLatch;

void reset() {
cancelHandlerLatch = new CountDownLatch(1);
}

void cancelDone() {
cancelHandlerLatch.countDown();
}

@Override
public void handle(HttpExchange exchange) throws IOException {
try {
cancelHandlerLatch.await();
} catch (InterruptedException ignore) {
} finally {
exchange.sendResponseHeaders(200, 0);
exchange.close();
}
}
}

private static class ResponseHandler implements HttpHandler {
private final int statusCode;

Expand Down Expand Up @@ -127,7 +156,7 @@ public void stopRandomHost() {
//verify that shutting down some hosts doesn't matter as long as one working host is left behind
if (httpServers.length > 1 && randomBoolean()) {
List<HttpServer> updatedHttpServers = new ArrayList<>(httpServers.length - 1);
int nodeIndex = randomInt(httpServers.length - 1);
int nodeIndex = randomIntBetween(0, httpServers.length - 1);
if (0 == nodeIndex) {
stoppedFirstHost = true;
}
Expand All @@ -139,7 +168,7 @@ public void stopRandomHost() {
updatedHttpServers.add(httpServer);
}
}
httpServers = updatedHttpServers.toArray(new HttpServer[updatedHttpServers.size()]);
httpServers = updatedHttpServers.toArray(new HttpServer[0]);
}
}

Expand Down Expand Up @@ -195,6 +224,40 @@ public void onFailure(Exception exception) {
}
}

public void testCancelAsyncRequests() throws Exception {
int numRequests = randomIntBetween(5, 20);
final CountDownLatch latch = new CountDownLatch(numRequests);
final List<Response> responses = new CopyOnWriteArrayList<>();
final List<Exception> exceptions = new CopyOnWriteArrayList<>();
for (int i = 0; i < numRequests; i++) {
waitForCancelHandler.reset();
final String method = RestClientTestUtil.randomHttpMethod(getRandom());
//we don't test status codes that are subject to retries as they interfere with hosts being stopped
final int statusCode = randomBoolean() ? randomOkStatusCode(getRandom()) : randomErrorNoRetryStatusCode(getRandom());
Cancellable cancellable = restClient.performRequestAsync(new Request(method, "/" + statusCode), new ResponseListener() {
@Override
public void onSuccess(Response response) {
responses.add(response);
latch.countDown();
}

@Override
public void onFailure(Exception exception) {
exceptions.add(exception);
latch.countDown();
}
});
cancellable.cancel();
waitForCancelHandler.cancelDone();
}
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertEquals(0, responses.size());
assertEquals(numRequests, exceptions.size());
for (Exception exception : exceptions) {
assertThat(exception, instanceOf(CancellationException.class));
}
}

/**
* Test host selector against a real server <strong>and</strong>
* test what happens after calling
Expand Down Expand Up @@ -249,13 +312,10 @@ Response getResponse() {
}

private NodeSelector firstPositionNodeSelector() {
return new NodeSelector() {
@Override
public void select(Iterable<Node> nodes) {
for (Iterator<Node> itr = nodes.iterator(); itr.hasNext();) {
if (httpHosts[0] != itr.next().getHost()) {
itr.remove();
}
return nodes -> {
for (Iterator<Node> itr = nodes.iterator(); itr.hasNext();) {
if (httpHosts[0] != itr.next().getHost()) {
itr.remove();
}
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,19 +243,16 @@ public void testRoundRobinRetryErrors() throws Exception {
}

public void testNodeSelector() throws Exception {
NodeSelector firstPositionOnly = new NodeSelector() {
@Override
public void select(Iterable<Node> restClientNodes) {
boolean found = false;
for (Iterator<Node> itr = restClientNodes.iterator(); itr.hasNext();) {
if (nodes.get(0) == itr.next()) {
found = true;
} else {
itr.remove();
}
NodeSelector firstPositionOnly = restClientNodes -> {
boolean found = false;
for (Iterator<Node> itr = restClientNodes.iterator(); itr.hasNext();) {
if (nodes.get(0) == itr.next()) {
found = true;
} else {
itr.remove();
}
assertTrue(found);
}
assertTrue(found);
};
RestClient restClient = createRestClient(firstPositionOnly);
int rounds = between(1, 10);
Expand Down
Loading

0 comments on commit 5d16d35

Please sign in to comment.