From 5059a0b9ec50bc888491289b5c1b5ed162ae5673 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Thu, 31 Jan 2019 12:18:22 +0100 Subject: [PATCH 01/13] Remove support for maxRetryTimeout from low-level REST client We have had various reports of problems caused by the maxRetryTimeout setting in the low-level REST client. Such setting was initially added in the attempts to not have requests go through retries if the request already took longer than the provided timeout. The implementation was problematic though as such timeout would also expire in the first request attempt (see #31834), would leave the request executing after expiration causing memory leaks (see #33342), and would not take into account the http client internal queuing (see Given all these issues, my conclusion is that this custom timeout mechanism gives little benefits while causing a lot of harm. We should rather rely on connect and socket timeout exposed by the underlying http client and accept that a request can overall take longer than the configured timeout, which is the case even with a single retry anyways. This commit removes the maxRetryTimeout setting and all of its usages. --- .../org/elasticsearch/client/RestClient.java | 38 ++++--------------- .../client/RestClientBuilder.java | 18 +-------- .../client/RestClientBuilderTests.java | 15 +------- .../client/RestClientMultipleHostsTests.java | 2 +- .../client/RestClientSingleHostTests.java | 2 +- .../elasticsearch/client/RestClientTests.java | 5 +-- .../client/SyncResponseListenerTests.java | 24 ++++++------ .../RestClientDocumentation.java | 10 +---- .../low-level/configuration.asciidoc | 3 +- docs/java-rest/low-level/usage.asciidoc | 9 ----- ...torageRepositoryClientYamlTestSuiteIT.java | 1 - .../upgrades/AbstractRollingTestCase.java | 1 - .../UpgradeClusterClientYamlTestSuiteIT.java | 1 - .../test/rest/ESRestTestCase.java | 6 --- .../xpack/restart/FullClusterRestartIT.java | 1 - .../kerberos/KerberosAuthenticationIT.java | 11 +----- .../UpgradeClusterClientYamlTestSuiteIT.java | 1 - 17 files changed, 30 insertions(+), 118 deletions(-) diff --git a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java index 175d524f02af5..428f53e0942b2 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java +++ b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java @@ -72,7 +72,6 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -103,7 +102,6 @@ public class RestClient implements Closeable { // We don't rely on default headers supported by HttpAsyncClient as those cannot be replaced. // These are package private for tests. final List
defaultHeaders; - private final long maxRetryTimeoutMillis; private final String pathPrefix; private final AtomicInteger lastNodeIndex = new AtomicInteger(0); private final ConcurrentMap blacklist = new ConcurrentHashMap<>(); @@ -112,10 +110,9 @@ public class RestClient implements Closeable { private volatile NodeTuple> nodeTuple; private final WarningsHandler warningsHandler; - RestClient(CloseableHttpAsyncClient client, long maxRetryTimeoutMillis, Header[] defaultHeaders, List nodes, String pathPrefix, + RestClient(CloseableHttpAsyncClient client, Header[] defaultHeaders, List nodes, String pathPrefix, FailureListener failureListener, NodeSelector nodeSelector, boolean strictDeprecationMode) { this.client = client; - this.maxRetryTimeoutMillis = maxRetryTimeoutMillis; this.defaultHeaders = Collections.unmodifiableList(Arrays.asList(defaultHeaders)); this.failureListener = failureListener; this.pathPrefix = pathPrefix; @@ -213,7 +210,7 @@ public List getNodes() { * @throws ResponseException in case Elasticsearch responded with a status code that indicated an error */ public Response performRequest(Request request) throws IOException { - SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis); + SyncResponseListener listener = new SyncResponseListener(); performRequestAsyncNoCatch(request, listener); return listener.get(); } @@ -335,19 +332,10 @@ public void failed(Exception failure) { private void retryIfPossible(Exception exception) { if (nodeTuple.nodes.hasNext()) { - //in case we are retrying, check whether maxRetryTimeout has been reached - long timeElapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime); - long timeout = maxRetryTimeoutMillis - timeElapsedMillis; - if (timeout <= 0) { - IOException retryTimeoutException = new IOException( - "request retries exceeded max retry timeout [" + maxRetryTimeoutMillis + "]", exception); - listener.onDefinitiveFailure(retryTimeoutException); - } else { - listener.trackFailure(exception); - request.reset(); - performRequestAsync(startTime, nodeTuple, request, ignoreErrorCodes, - thisWarningsHandler, httpAsyncResponseConsumerFactory, listener); - } + listener.trackFailure(exception); + request.reset(); + performRequestAsync(startTime, nodeTuple, request, ignoreErrorCodes, + thisWarningsHandler, httpAsyncResponseConsumerFactory, listener); } else { listener.onDefinitiveFailure(exception); } @@ -630,13 +618,6 @@ static class SyncResponseListener implements ResponseListener { private final AtomicReference response = new AtomicReference<>(); private final AtomicReference exception = new AtomicReference<>(); - private final long timeout; - - SyncResponseListener(long timeout) { - assert timeout > 0; - this.timeout = timeout; - } - @Override public void onSuccess(Response response) { Objects.requireNonNull(response, "response must not be null"); @@ -663,15 +644,10 @@ public void onFailure(Exception exception) { */ Response get() throws IOException { try { - //providing timeout is just a safety measure to prevent everlasting waits - //the different client timeouts should already do their jobs - if (latch.await(timeout, TimeUnit.MILLISECONDS) == false) { - throw new IOException("listener timeout after waiting for [" + timeout + "] ms"); - } + latch.await(); } catch (InterruptedException e) { throw new RuntimeException("thread waiting for the response was interrupted", e); } - Exception exception = this.exception.get(); Response response = this.response.get(); if (exception != null) { diff --git a/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java b/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java index 84cc3ee1667b1..2337cbf1fd029 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java +++ b/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java @@ -42,14 +42,12 @@ public final class RestClientBuilder { public static final int DEFAULT_CONNECT_TIMEOUT_MILLIS = 1000; public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 30000; - public static final int DEFAULT_MAX_RETRY_TIMEOUT_MILLIS = DEFAULT_SOCKET_TIMEOUT_MILLIS; public static final int DEFAULT_MAX_CONN_PER_ROUTE = 10; public static final int DEFAULT_MAX_CONN_TOTAL = 30; private static final Header[] EMPTY_HEADERS = new Header[0]; private final List nodes; - private int maxRetryTimeout = DEFAULT_MAX_RETRY_TIMEOUT_MILLIS; private Header[] defaultHeaders = EMPTY_HEADERS; private RestClient.FailureListener failureListener; private HttpClientConfigCallback httpClientConfigCallback; @@ -102,20 +100,6 @@ public RestClientBuilder setFailureListener(RestClient.FailureListener failureLi return this; } - /** - * Sets the maximum timeout (in milliseconds) to honour in case of multiple retries of the same request. - * {@link #DEFAULT_MAX_RETRY_TIMEOUT_MILLIS} if not specified. - * - * @throws IllegalArgumentException if {@code maxRetryTimeoutMillis} is not greater than 0 - */ - public RestClientBuilder setMaxRetryTimeoutMillis(int maxRetryTimeoutMillis) { - if (maxRetryTimeoutMillis <= 0) { - throw new IllegalArgumentException("maxRetryTimeoutMillis must be greater than 0"); - } - this.maxRetryTimeout = maxRetryTimeoutMillis; - return this; - } - /** * Sets the {@link HttpClientConfigCallback} to be used to customize http client configuration * @@ -208,7 +192,7 @@ public CloseableHttpAsyncClient run() { return createHttpClient(); } }); - RestClient restClient = new RestClient(httpClient, maxRetryTimeout, defaultHeaders, nodes, + RestClient restClient = new RestClient(httpClient, defaultHeaders, nodes, pathPrefix, failureListener, nodeSelector, strictDeprecationMode); httpClient.start(); return restClient; diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientBuilderTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientBuilderTests.java index 834748d65de34..1e16d94076a05 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientBuilderTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientBuilderTests.java @@ -82,14 +82,6 @@ public void testBuild() throws IOException { assertNotNull(restClient); } - try { - RestClient.builder(new HttpHost("localhost", 9200)) - .setMaxRetryTimeoutMillis(randomIntBetween(Integer.MIN_VALUE, 0)); - fail("should have failed"); - } catch(IllegalArgumentException e) { - assertEquals("maxRetryTimeoutMillis must be greater than 0", e.getMessage()); - } - try { RestClient.builder(new HttpHost("localhost", 9200)).setDefaultHeaders(null); fail("should have failed"); @@ -156,12 +148,9 @@ public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder reques builder.setDefaultHeaders(headers); } if (randomBoolean()) { - builder.setMaxRetryTimeoutMillis(randomIntBetween(1, Integer.MAX_VALUE)); - } - if (randomBoolean()) { - String pathPrefix = (randomBoolean() ? "/" : "") + randomAsciiOfLengthBetween(2, 5); + String pathPrefix = (randomBoolean() ? "/" : "") + randomAsciiLettersOfLengthBetween(2, 5); while (pathPrefix.length() < 20 && randomBoolean()) { - pathPrefix += "/" + randomAsciiOfLengthBetween(3, 6); + pathPrefix += "/" + randomAsciiLettersOfLengthBetween(3, 6); } builder.setPathPrefix(pathPrefix + (randomBoolean() ? "/" : "")); } diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java index 7dd1c4d842bff..1383a0aaf73ac 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java @@ -115,7 +115,7 @@ public void run() { } nodes = Collections.unmodifiableList(nodes); failureListener = new HostsTrackingFailureListener(); - return new RestClient(httpClient, 10000, new Header[0], nodes, null, failureListener, nodeSelector, false); + return new RestClient(httpClient, new Header[0], nodes, null, failureListener, nodeSelector, false); } /** diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java index aaef5404f2802..baee2a45febdb 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java @@ -155,7 +155,7 @@ public void run() { node = new Node(new HttpHost("localhost", 9200)); failureListener = new HostsTrackingFailureListener(); strictDeprecationMode = randomBoolean(); - restClient = new RestClient(httpClient, 10000, defaultHeaders, + restClient = new RestClient(httpClient, defaultHeaders, singletonList(node), null, failureListener, NodeSelector.ANY, strictDeprecationMode); } diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java index f3f0f0e58b98d..6b5f8bf907eea 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientTests.java @@ -57,7 +57,7 @@ public class RestClientTests extends RestClientTestCase { public void testCloseIsIdempotent() throws IOException { List nodes = singletonList(new Node(new HttpHost("localhost", 9200))); CloseableHttpAsyncClient closeableHttpAsyncClient = mock(CloseableHttpAsyncClient.class); - RestClient restClient = new RestClient(closeableHttpAsyncClient, 1_000, new Header[0], nodes, null, null, null, false); + RestClient restClient = new RestClient(closeableHttpAsyncClient, new Header[0], nodes, null, null, null, false); restClient.close(); verify(closeableHttpAsyncClient, times(1)).close(); restClient.close(); @@ -353,8 +353,7 @@ private static String assertSelectAllRejected( NodeTuple> nodeTuple, private static RestClient createRestClient() { List nodes = Collections.singletonList(new Node(new HttpHost("localhost", 9200))); - return new RestClient(mock(CloseableHttpAsyncClient.class), randomLongBetween(1_000, 30_000), - new Header[] {}, nodes, null, null, null, false); + return new RestClient(mock(CloseableHttpAsyncClient.class), new Header[] {}, nodes, null, null, null, false); } public void testRoundRobin() throws IOException { diff --git a/client/rest/src/test/java/org/elasticsearch/client/SyncResponseListenerTests.java b/client/rest/src/test/java/org/elasticsearch/client/SyncResponseListenerTests.java index 683b23a596a16..e85417f2b6131 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/SyncResponseListenerTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/SyncResponseListenerTests.java @@ -30,12 +30,12 @@ import org.apache.http.message.BasicRequestLine; import org.apache.http.message.BasicStatusLine; +import javax.net.ssl.SSLHandshakeException; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.net.SocketTimeoutException; import java.net.URISyntaxException; -import javax.net.ssl.SSLHandshakeException; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -69,7 +69,7 @@ static void assertExceptionStackContainsCallingMethod(Exception e) { } public void testOnSuccessNullResponse() { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); + RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(); try { syncResponseListener.onSuccess(null); fail("onSuccess should have failed"); @@ -79,7 +79,7 @@ public void testOnSuccessNullResponse() { } public void testOnFailureNullException() { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); + RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(); try { syncResponseListener.onFailure(null); fail("onFailure should have failed"); @@ -89,7 +89,7 @@ public void testOnFailureNullException() { } public void testOnSuccess() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); + RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(); Response mockResponse = mockResponse(); syncResponseListener.onSuccess(mockResponse); Response response = syncResponseListener.get(); @@ -106,7 +106,7 @@ public void testOnSuccess() throws Exception { } public void testOnFailure() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); + RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(); RuntimeException firstException = new RuntimeException("first-test"); syncResponseListener.onFailure(firstException); try { @@ -145,7 +145,7 @@ public void testOnFailure() throws Exception { } public void testRuntimeIsBuiltCorrectly() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); + RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(); RuntimeException runtimeException = new RuntimeException(); syncResponseListener.onFailure(runtimeException); try { @@ -162,7 +162,7 @@ public void testRuntimeIsBuiltCorrectly() throws Exception { } public void testConnectTimeoutExceptionIsBuiltCorrectly() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); + RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(); ConnectTimeoutException timeoutException = new ConnectTimeoutException(); syncResponseListener.onFailure(timeoutException); try { @@ -179,7 +179,7 @@ public void testConnectTimeoutExceptionIsBuiltCorrectly() throws Exception { } public void testSocketTimeoutExceptionIsBuiltCorrectly() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); + RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(); SocketTimeoutException timeoutException = new SocketTimeoutException(); syncResponseListener.onFailure(timeoutException); try { @@ -196,7 +196,7 @@ public void testSocketTimeoutExceptionIsBuiltCorrectly() throws Exception { } public void testConnectionClosedExceptionIsWrapped() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); + RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(); ConnectionClosedException closedException = new ConnectionClosedException(randomAsciiAlphanumOfLength(5)); syncResponseListener.onFailure(closedException); try { @@ -213,7 +213,7 @@ public void testConnectionClosedExceptionIsWrapped() throws Exception { } public void testSSLHandshakeExceptionIsWrapped() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); + RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(); SSLHandshakeException exception = new SSLHandshakeException(randomAsciiAlphanumOfLength(5)); syncResponseListener.onFailure(exception); try { @@ -230,7 +230,7 @@ public void testSSLHandshakeExceptionIsWrapped() throws Exception { } public void testIOExceptionIsBuiltCorrectly() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); + RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(); IOException ioException = new IOException(); syncResponseListener.onFailure(ioException); try { @@ -247,7 +247,7 @@ public void testIOExceptionIsBuiltCorrectly() throws Exception { } public void testExceptionIsWrapped() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(10000); + RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(); //we just need any checked exception URISyntaxException exception = new URISyntaxException("test", "test"); syncResponseListener.onFailure(exception); diff --git a/client/rest/src/test/java/org/elasticsearch/client/documentation/RestClientDocumentation.java b/client/rest/src/test/java/org/elasticsearch/client/documentation/RestClientDocumentation.java index 7eae17d83cf2b..8653db4226fe1 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/documentation/RestClientDocumentation.java +++ b/client/rest/src/test/java/org/elasticsearch/client/documentation/RestClientDocumentation.java @@ -111,13 +111,6 @@ public void usage() throws IOException, InterruptedException { builder.setDefaultHeaders(defaultHeaders); // <1> //end::rest-client-init-default-headers } - { - //tag::rest-client-init-max-retry-timeout - RestClientBuilder builder = RestClient.builder( - new HttpHost("localhost", 9200, "http")); - builder.setMaxRetryTimeoutMillis(10000); // <1> - //end::rest-client-init-max-retry-timeout - } { //tag::rest-client-init-node-selector RestClientBuilder builder = RestClient.builder( @@ -305,8 +298,7 @@ public RequestConfig.Builder customizeRequestConfig( .setConnectTimeout(5000) .setSocketTimeout(60000); } - }) - .setMaxRetryTimeoutMillis(60000); + }); //end::rest-client-config-timeouts } { diff --git a/docs/java-rest/low-level/configuration.asciidoc b/docs/java-rest/low-level/configuration.asciidoc index b7da2b5ebccff..e284b52c67a67 100644 --- a/docs/java-rest/low-level/configuration.asciidoc +++ b/docs/java-rest/low-level/configuration.asciidoc @@ -18,8 +18,7 @@ https://hc.apache.org/httpcomponents-client-ga/httpclient/apidocs/org/apache/htt as an argument and has the same return type. The request config builder can be modified and then returned. In the following example we increase the connect timeout (defaults to 1 second) and the socket timeout (defaults to 30 -seconds). Also we adjust the max retry timeout accordingly (defaults to 30 -seconds too). +seconds). ["source","java",subs="attributes,callouts,macros"] -------------------------------------------------- diff --git a/docs/java-rest/low-level/usage.asciidoc b/docs/java-rest/low-level/usage.asciidoc index 3747314b6ecd3..ee1555019dbe1 100644 --- a/docs/java-rest/low-level/usage.asciidoc +++ b/docs/java-rest/low-level/usage.asciidoc @@ -180,15 +180,6 @@ include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-defaul <1> Set the default headers that need to be sent with each request, to prevent having to specify them with each single request -["source","java",subs="attributes,callouts,macros"] --------------------------------------------------- -include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-max-retry-timeout] --------------------------------------------------- -<1> Set the timeout that should be honoured in case multiple attempts are made -for the same request. The default value is 30 seconds, same as the default -socket timeout. In case the socket timeout is customized, the maximum retry -timeout should be adjusted accordingly - ["source","java",subs="attributes,callouts,macros"] -------------------------------------------------- include-tagged::{doc-tests}/RestClientDocumentation.java[rest-client-init-failure-listener] diff --git a/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageRepositoryClientYamlTestSuiteIT.java b/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageRepositoryClientYamlTestSuiteIT.java index 720f152265a61..9822da98fde70 100644 --- a/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageRepositoryClientYamlTestSuiteIT.java +++ b/plugins/repository-azure/qa/microsoft-azure-storage/src/test/java/org/elasticsearch/repositories/azure/AzureStorageRepositoryClientYamlTestSuiteIT.java @@ -41,7 +41,6 @@ public static Iterable parameters() throws Exception { protected Settings restClientSettings() { // Give more time to repository-azure to complete the snapshot operations return Settings.builder().put(super.restClientSettings()) - .put(ESRestTestCase.CLIENT_RETRY_TIMEOUT, "60s") .put(ESRestTestCase.CLIENT_SOCKET_TIMEOUT, "60s") .build(); } diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractRollingTestCase.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractRollingTestCase.java index eb5517b7acb56..1c57be7abbaa1 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractRollingTestCase.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/AbstractRollingTestCase.java @@ -59,7 +59,6 @@ protected final Settings restClientSettings() { // increase the timeout here to 90 seconds to handle long waits for a green // cluster health. the waits for green need to be longer than a minute to // account for delayed shards - .put(ESRestTestCase.CLIENT_RETRY_TIMEOUT, "90s") .put(ESRestTestCase.CLIENT_SOCKET_TIMEOUT, "90s") .build(); } diff --git a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java index 7932328c8c2f6..e7c111aad1605 100644 --- a/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java +++ b/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java @@ -55,7 +55,6 @@ protected Settings restClientSettings() { // increase the timeout here to 90 seconds to handle long waits for a green // cluster health. the waits for green need to be longer than a minute to // account for delayed shards - .put(ESRestTestCase.CLIENT_RETRY_TIMEOUT, "90s") .put(ESRestTestCase.CLIENT_SOCKET_TIMEOUT, "90s") .build(); } diff --git a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java index c363b7f4f6c92..eef69e129c110 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java @@ -92,7 +92,6 @@ public abstract class ESRestTestCase extends ESTestCase { public static final String TRUSTSTORE_PATH = "truststore.path"; public static final String TRUSTSTORE_PASSWORD = "truststore.password"; - public static final String CLIENT_RETRY_TIMEOUT = "client.retry.timeout"; public static final String CLIENT_SOCKET_TIMEOUT = "client.socket.timeout"; public static final String CLIENT_PATH_PREFIX = "client.path.prefix"; @@ -728,11 +727,6 @@ protected static void configureClient(RestClientBuilder builder, Settings settin } builder.setDefaultHeaders(defaultHeaders); } - final String requestTimeoutString = settings.get(CLIENT_RETRY_TIMEOUT); - if (requestTimeoutString != null) { - final TimeValue maxRetryTimeout = TimeValue.parseTimeValue(requestTimeoutString, CLIENT_RETRY_TIMEOUT); - builder.setMaxRetryTimeoutMillis(Math.toIntExact(maxRetryTimeout.getMillis())); - } final String socketTimeoutString = settings.get(CLIENT_SOCKET_TIMEOUT); if (socketTimeoutString != null) { final TimeValue socketTimeout = TimeValue.parseTimeValue(socketTimeoutString, CLIENT_SOCKET_TIMEOUT); diff --git a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java index d1aefb4000890..e7a6ac0c06337 100644 --- a/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java +++ b/x-pack/qa/full-cluster-restart/src/test/java/org/elasticsearch/xpack/restart/FullClusterRestartIT.java @@ -68,7 +68,6 @@ protected Settings restClientSettings() { // we increase the timeout here to 90 seconds to handle long waits for a green // cluster health. the waits for green need to be longer than a minute to // account for delayed shards - .put(ESRestTestCase.CLIENT_RETRY_TIMEOUT, "90s") .put(ESRestTestCase.CLIENT_SOCKET_TIMEOUT, "90s") .build(); } diff --git a/x-pack/qa/kerberos-tests/src/test/java/org/elasticsearch/xpack/security/authc/kerberos/KerberosAuthenticationIT.java b/x-pack/qa/kerberos-tests/src/test/java/org/elasticsearch/xpack/security/authc/kerberos/KerberosAuthenticationIT.java index ff5c24b15edac..0281f38933b03 100644 --- a/x-pack/qa/kerberos-tests/src/test/java/org/elasticsearch/xpack/security/authc/kerberos/KerberosAuthenticationIT.java +++ b/x-pack/qa/kerberos-tests/src/test/java/org/elasticsearch/xpack/security/authc/kerberos/KerberosAuthenticationIT.java @@ -23,6 +23,7 @@ import org.elasticsearch.test.rest.ESRestTestCase; import org.junit.Before; +import javax.security.auth.login.LoginContext; import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; @@ -33,8 +34,6 @@ import java.util.List; import java.util.Map; -import javax.security.auth.login.LoginContext; - import static org.elasticsearch.common.xcontent.XContentHelper.convertToMap; import static org.elasticsearch.xpack.core.security.authc.support.UsernamePasswordToken.basicAuthHeaderValue; import static org.hamcrest.Matchers.contains; @@ -148,13 +147,7 @@ private RestClient buildRestClientForKerberos(final SpnegoHttpClientConfigCallba return restClientBuilder.build(); } - private static void configureRestClientBuilder(final RestClientBuilder restClientBuilder, final Settings settings) - throws IOException { - final String requestTimeoutString = settings.get(CLIENT_RETRY_TIMEOUT); - if (requestTimeoutString != null) { - final TimeValue maxRetryTimeout = TimeValue.parseTimeValue(requestTimeoutString, CLIENT_RETRY_TIMEOUT); - restClientBuilder.setMaxRetryTimeoutMillis(Math.toIntExact(maxRetryTimeout.getMillis())); - } + private static void configureRestClientBuilder(final RestClientBuilder restClientBuilder, final Settings settings) { final String socketTimeoutString = settings.get(CLIENT_SOCKET_TIMEOUT); if (socketTimeoutString != null) { final TimeValue socketTimeout = TimeValue.parseTimeValue(socketTimeoutString, CLIENT_SOCKET_TIMEOUT); diff --git a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java index 3c7a9cee45562..9374346449c95 100644 --- a/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java +++ b/x-pack/qa/rolling-upgrade/src/test/java/org/elasticsearch/upgrades/UpgradeClusterClientYamlTestSuiteIT.java @@ -68,7 +68,6 @@ protected Settings restClientSettings() { // we increase the timeout here to 90 seconds to handle long waits for a green // cluster health. the waits for green need to be longer than a minute to // account for delayed shards - .put(ESRestTestCase.CLIENT_RETRY_TIMEOUT, "90s") .put(ESRestTestCase.CLIENT_SOCKET_TIMEOUT, "90s") .build(); } From 6a7f7cdee15277d7ee0beab2d8172b2509a119cb Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Thu, 31 Jan 2019 12:24:16 +0100 Subject: [PATCH 02/13] migrate note --- .../migration/migrate_7_0/restclient.asciidoc | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/docs/reference/migration/migrate_7_0/restclient.asciidoc b/docs/reference/migration/migrate_7_0/restclient.asciidoc index 3c0237db6e7b0..0f9acbb471004 100644 --- a/docs/reference/migration/migrate_7_0/restclient.asciidoc +++ b/docs/reference/migration/migrate_7_0/restclient.asciidoc @@ -20,4 +20,12 @@ e.g. `client.index(indexRequest, new Header("name" "value"))` becomes The Cluster Health API used to default to `shards` level to ease migration from transport client that doesn't support the `level` parameter and always returns information including indices and shards details. The level default -value has been aligned with the Elasticsearch default level: `cluster`. \ No newline at end of file +value has been aligned with the Elasticsearch default level: `cluster`. + +=== Low-level REST client changes + +[float] +==== Support for `maxRetryTimeout` removed from RestClient + +`RestClient` and `RestClientBuilder` no longer support the `maxRetryTimeout` +setting. \ No newline at end of file From 86fa432d842fb4230d0cb086309543c75f6ae20c Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 1 Feb 2019 17:30:11 +0100 Subject: [PATCH 03/13] remove unused argument --- .../src/main/java/org/elasticsearch/client/RestClient.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java index 428f53e0942b2..b4dab9b641ce3 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java +++ b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java @@ -270,13 +270,12 @@ void performRequestAsyncNoCatch(Request request, ResponseListener listener) thro HttpRequestBase httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity()); setHeaders(httpRequest, request.getOptions().getHeaders()); FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(listener); - long startTime = System.nanoTime(); - performRequestAsync(startTime, nextNode(), httpRequest, ignoreErrorCodes, + performRequestAsync(nextNode(), httpRequest, ignoreErrorCodes, request.getOptions().getWarningsHandler() == null ? warningsHandler : request.getOptions().getWarningsHandler(), request.getOptions().getHttpAsyncResponseConsumerFactory(), failureTrackingResponseListener); } - private void performRequestAsync(final long startTime, final NodeTuple> nodeTuple, final HttpRequestBase request, + private void performRequestAsync(final NodeTuple> nodeTuple, final HttpRequestBase request, final Set ignoreErrorCodes, final WarningsHandler thisWarningsHandler, final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory, @@ -334,7 +333,7 @@ private void retryIfPossible(Exception exception) { if (nodeTuple.nodes.hasNext()) { listener.trackFailure(exception); request.reset(); - performRequestAsync(startTime, nodeTuple, request, ignoreErrorCodes, + performRequestAsync(nodeTuple, request, ignoreErrorCodes, thisWarningsHandler, httpAsyncResponseConsumerFactory, listener); } else { listener.onDefinitiveFailure(exception); From 29fd5a8f16add482de78dad0eac984a4621483ba Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Fri, 1 Feb 2019 20:01:28 +0100 Subject: [PATCH 04/13] update javadocs --- .../src/main/java/org/elasticsearch/client/RestClient.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java index b4dab9b641ce3..101c5bed500f4 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java +++ b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java @@ -610,7 +610,7 @@ void trackFailure(Exception exception) { } /** - * Listener used in any sync performRequest calls, it waits for a response or an exception back up to a timeout + * Listener used in any sync performRequest calls, it waits for a response or an exception back from the http client */ static class SyncResponseListener implements ResponseListener { private final CountDownLatch latch = new CountDownLatch(1); @@ -639,7 +639,7 @@ public void onFailure(Exception exception) { } /** - * Waits (up to a timeout) for some result of the request: either a response, or an exception. + * Waits for some result of the request: either a response, or an exception. */ Response get() throws IOException { try { From c3a41bcce0df30c467519a875cd5af64e48ff874 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Mon, 4 Feb 2019 17:50:19 +0100 Subject: [PATCH 05/13] revisit sync calls to rely on future.get rather than countdownlatch --- .../client/ResponseException.java | 10 - .../org/elasticsearch/client/RestClient.java | 414 +++++++++--------- .../client/RestClientMultipleHostsTests.java | 35 +- .../client/RestClientSingleHostTests.java | 45 +- .../client/SyncResponseListenerTests.java | 273 ------------ 5 files changed, 242 insertions(+), 535 deletions(-) delete mode 100644 client/rest/src/test/java/org/elasticsearch/client/SyncResponseListenerTests.java diff --git a/client/rest/src/main/java/org/elasticsearch/client/ResponseException.java b/client/rest/src/main/java/org/elasticsearch/client/ResponseException.java index 4d57f12742e03..e133776affb9c 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/ResponseException.java +++ b/client/rest/src/main/java/org/elasticsearch/client/ResponseException.java @@ -39,16 +39,6 @@ public ResponseException(Response response) throws IOException { this.response = response; } - /** - * Wrap a {@linkplain ResponseException} with another one with the current - * stack trace. This is used during synchronous calls so that the caller - * ends up in the stack trace of the exception thrown. - */ - ResponseException(ResponseException e) throws IOException { - super(e.getMessage(), e); - this.response = e.getResponse(); - } - static String buildMessage(Response response) throws IOException { String message = String.format(Locale.ROOT, "method [%s], host [%s], URI [%s], status line [%s]", diff --git a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java index 101c5bed500f4..bdfaf37077ede 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java +++ b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java @@ -20,7 +20,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.http.ConnectionClosedException; import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; @@ -39,7 +38,6 @@ import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.client.utils.URIBuilder; import org.apache.http.concurrent.FutureCallback; -import org.apache.http.conn.ConnectTimeoutException; import org.apache.http.impl.auth.BasicScheme; import org.apache.http.impl.client.BasicAuthCache; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; @@ -48,11 +46,8 @@ import org.apache.http.nio.protocol.HttpAsyncResponseConsumer; import org.elasticsearch.client.DeadHostState.TimeSupplier; -import javax.net.ssl.SSLHandshakeException; import java.io.Closeable; import java.io.IOException; -import java.net.ConnectException; -import java.net.SocketTimeoutException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -70,10 +65,8 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.singletonList; @@ -196,12 +189,7 @@ public List getNodes() { * of them does, in which case an {@link IOException} will be thrown. * * This method works by performing an asynchronous call and waiting - * for the result. If the asynchronous call throws an exception we wrap - * it and rethrow it so that the stack trace attached to the exception - * contains the call site. While we attempt to preserve the original - * exception this isn't always possible and likely haven't covered all of - * the cases. You can get the original exception from - * {@link Exception#getCause()}. + * for its result. * * @param request the request to perform * @return the response returned by Elasticsearch @@ -210,9 +198,80 @@ public List getNodes() { * @throws ResponseException in case Elasticsearch responded with a status code that indicated an error */ public Response performRequest(Request request) throws IOException { - SyncResponseListener listener = new SyncResponseListener(); - performRequestAsyncNoCatch(request, listener); - return listener.get(); + InternalRequest internalRequest = new InternalRequest(request); + return performRequest(nextNodes(), internalRequest, null); + } + + private Response performRequest(final NodeTuple> nodeTuple, + final InternalRequest request, + Exception previousException) throws IOException { + RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache); + HttpResponse httpResponse; + try { + httpResponse = client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, null).get(); + } catch(Exception e) { + RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, e); + onFailure(context.node); + Exception cause = unwrapExecutionException(e); + addSuppressedException(previousException, cause); + if (nodeTuple.nodes.hasNext()) { + return performRequest(nodeTuple, request, cause); + } + if (cause instanceof IOException) { + throw (IOException)cause; + } + if (cause instanceof RuntimeException) { + throw (RuntimeException)cause; + } + throw new RuntimeException(cause); + } + InternalResponse internalResponse = onResponse(request, context.node, httpResponse); + if (internalResponse.responseException == null) { + return internalResponse.response; + } + addSuppressedException(previousException, internalResponse.responseException); + if (nodeTuple.nodes.hasNext()) { + return performRequest(nodeTuple, request, internalResponse.responseException); + } else { + throw internalResponse.responseException; + } + } + + private static Exception unwrapExecutionException(Exception e) { + if (e instanceof ExecutionException) { + ExecutionException executionException = (ExecutionException)e; + Throwable t = executionException.getCause() == null ? executionException : executionException.getCause(); + if (t instanceof Error) { + throw (Error)t; + } + return (Exception)t; + } + return e; + } + + private InternalResponse onResponse(InternalRequest request, Node node, HttpResponse httpResponse) throws IOException { + RequestLogger.logResponse(logger, request.httpRequest, node.getHost(), httpResponse); + int statusCode = httpResponse.getStatusLine().getStatusCode(); + Response response = new Response(request.httpRequest.getRequestLine(), node.getHost(), httpResponse); + if (isSuccessfulResponse(statusCode) || request.ignoreErrorCodes.contains(response.getStatusLine().getStatusCode())) { + onResponse(node); + if (request.warningsHandler.warningsShouldFailRequest(response.getWarnings())) { + throw new WarningFailureException(response); + } else { + return new InternalResponse(response); + } + } else { + ResponseException responseException = new ResponseException(response); + if (isRetryStatus(statusCode)) { + //mark host dead and retry against next one + onFailure(node); + return new InternalResponse(responseException); + } else { + //mark host alive and don't retry, as the error should be a request problem + onResponse(node); + throw responseException; + } + } } /** @@ -233,84 +292,31 @@ public Response performRequest(Request request) throws IOException { */ public void performRequestAsync(Request request, ResponseListener responseListener) { try { - performRequestAsyncNoCatch(request, responseListener); + FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(responseListener); + InternalRequest internalRequest = new InternalRequest(request); + performRequestAsync(nextNodes(), internalRequest, failureTrackingResponseListener); } catch (Exception e) { responseListener.onFailure(e); } } - void performRequestAsyncNoCatch(Request request, ResponseListener listener) throws IOException { - Map requestParams = new HashMap<>(request.getParameters()); - //ignore is a special parameter supported by the clients, shouldn't be sent to es - String ignoreString = requestParams.remove("ignore"); - Set ignoreErrorCodes; - if (ignoreString == null) { - if (HttpHead.METHOD_NAME.equals(request.getMethod())) { - //404 never causes error if returned for a HEAD request - ignoreErrorCodes = Collections.singleton(404); - } else { - ignoreErrorCodes = Collections.emptySet(); - } - } else { - String[] ignoresArray = ignoreString.split(","); - ignoreErrorCodes = new HashSet<>(); - if (HttpHead.METHOD_NAME.equals(request.getMethod())) { - //404 never causes error if returned for a HEAD request - ignoreErrorCodes.add(404); - } - for (String ignoreCode : ignoresArray) { - try { - ignoreErrorCodes.add(Integer.valueOf(ignoreCode)); - } catch (NumberFormatException e) { - throw new IllegalArgumentException("ignore value should be a number, found [" + ignoreString + "] instead", e); - } - } - } - URI uri = buildUri(pathPrefix, request.getEndpoint(), requestParams); - HttpRequestBase httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity()); - setHeaders(httpRequest, request.getOptions().getHeaders()); - FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(listener); - performRequestAsync(nextNode(), httpRequest, ignoreErrorCodes, - request.getOptions().getWarningsHandler() == null ? warningsHandler : request.getOptions().getWarningsHandler(), - request.getOptions().getHttpAsyncResponseConsumerFactory(), failureTrackingResponseListener); - } - - private void performRequestAsync(final NodeTuple> nodeTuple, final HttpRequestBase request, - final Set ignoreErrorCodes, - final WarningsHandler thisWarningsHandler, - final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory, + private void performRequestAsync(final NodeTuple> nodeTuple, + final InternalRequest request, final FailureTrackingResponseListener listener) { - final Node node = nodeTuple.nodes.next(); - //we stream the request body if the entity allows for it - final HttpAsyncRequestProducer requestProducer = HttpAsyncMethods.create(node.getHost(), request); - final HttpAsyncResponseConsumer asyncResponseConsumer = - httpAsyncResponseConsumerFactory.createHttpAsyncResponseConsumer(); - final HttpClientContext context = HttpClientContext.create(); - context.setAuthCache(nodeTuple.authCache); - client.execute(requestProducer, asyncResponseConsumer, context, new FutureCallback() { + final RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache); + client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, new FutureCallback() { @Override public void completed(HttpResponse httpResponse) { try { - RequestLogger.logResponse(logger, request, node.getHost(), httpResponse); - int statusCode = httpResponse.getStatusLine().getStatusCode(); - Response response = new Response(request.getRequestLine(), node.getHost(), httpResponse); - if (isSuccessfulResponse(statusCode) || ignoreErrorCodes.contains(response.getStatusLine().getStatusCode())) { - onResponse(node); - if (thisWarningsHandler.warningsShouldFailRequest(response.getWarnings())) { - listener.onDefinitiveFailure(new WarningFailureException(response)); - } else { - listener.onSuccess(response); - } + InternalResponse internalResponse = onResponse(request, context.node, httpResponse); + if (internalResponse.responseException == null) { + listener.onSuccess(internalResponse.response); } else { - ResponseException responseException = new ResponseException(response); - if (isRetryStatus(statusCode)) { - //mark host dead and retry against next one - onFailure(node); - retryIfPossible(responseException); + if (nodeTuple.nodes.hasNext()) { + listener.trackFailure(internalResponse.responseException); + performRequestAsync(nodeTuple, request, listener); } else { - //mark host alive and don't retry, as the error should be a request problem - onResponse(node); - listener.onDefinitiveFailure(responseException); + listener.onDefinitiveFailure(internalResponse.responseException); } } } catch(Exception e) { @@ -321,25 +327,19 @@ public void completed(HttpResponse httpResponse) { @Override public void failed(Exception failure) { try { - RequestLogger.logFailedRequest(logger, request, node, failure); - onFailure(node); - retryIfPossible(failure); + RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, failure); + onFailure(context.node); + if (nodeTuple.nodes.hasNext()) { + listener.trackFailure(failure); + performRequestAsync(nodeTuple, request, listener); + } else { + listener.onDefinitiveFailure(failure); + } } catch(Exception e) { listener.onDefinitiveFailure(e); } } - private void retryIfPossible(Exception exception) { - if (nodeTuple.nodes.hasNext()) { - listener.trackFailure(exception); - request.reset(); - performRequestAsync(nodeTuple, request, ignoreErrorCodes, - thisWarningsHandler, httpAsyncResponseConsumerFactory, listener); - } else { - listener.onDefinitiveFailure(exception); - } - } - @Override public void cancelled() { listener.onDefinitiveFailure(new ExecutionException("request was cancelled", null)); @@ -347,20 +347,6 @@ public void cancelled() { }); } - private void setHeaders(HttpRequest httpRequest, Collection
requestHeaders) { - // request headers override default headers, so we don't add default headers if they exist as request headers - final Set requestNames = new HashSet<>(requestHeaders.size()); - for (Header requestHeader : requestHeaders) { - httpRequest.addHeader(requestHeader); - requestNames.add(requestHeader.getName()); - } - for (Header defaultHeader : defaultHeaders) { - if (requestNames.contains(defaultHeader.getName()) == false) { - httpRequest.addHeader(defaultHeader); - } - } - } - /** * Returns a non-empty {@link Iterator} of nodes to be used for a request * that match the {@link NodeSelector}. @@ -370,7 +356,7 @@ private void setHeaders(HttpRequest httpRequest, Collection
requestHeade * that is closest to being revived. * @throws IOException if no nodes are available */ - private NodeTuple> nextNode() throws IOException { + private NodeTuple> nextNodes() throws IOException { NodeTuple> nodeTuple = this.nodeTuple; Iterable hosts = selectNodes(nodeTuple, blacklist, lastNodeIndex, nodeSelector); return new NodeTuple<>(hosts.iterator(), nodeTuple.authCache); @@ -504,11 +490,10 @@ private static boolean isRetryStatus(int statusCode) { return false; } - private static Exception addSuppressedException(Exception suppressedException, Exception currentException) { + private static void addSuppressedException(Exception suppressedException, Exception currentException) { if (suppressedException != null) { currentException.addSuppressed(suppressedException); } - return currentException; } private static HttpRequestBase createHttpRequest(String method, URI uri, HttpEntity entity) { @@ -605,106 +590,8 @@ void onDefinitiveFailure(Exception exception) { * Tracks an exception, which caused a retry hence we should not return yet to the caller */ void trackFailure(Exception exception) { - this.exception = addSuppressedException(this.exception, exception); - } - } - - /** - * Listener used in any sync performRequest calls, it waits for a response or an exception back from the http client - */ - static class SyncResponseListener implements ResponseListener { - private final CountDownLatch latch = new CountDownLatch(1); - private final AtomicReference response = new AtomicReference<>(); - private final AtomicReference exception = new AtomicReference<>(); - - @Override - public void onSuccess(Response response) { - Objects.requireNonNull(response, "response must not be null"); - boolean wasResponseNull = this.response.compareAndSet(null, response); - if (wasResponseNull == false) { - throw new IllegalStateException("response is already set"); - } - - latch.countDown(); - } - - @Override - public void onFailure(Exception exception) { - Objects.requireNonNull(exception, "exception must not be null"); - boolean wasExceptionNull = this.exception.compareAndSet(null, exception); - if (wasExceptionNull == false) { - throw new IllegalStateException("exception is already set"); - } - latch.countDown(); - } - - /** - * Waits for some result of the request: either a response, or an exception. - */ - Response get() throws IOException { - try { - latch.await(); - } catch (InterruptedException e) { - throw new RuntimeException("thread waiting for the response was interrupted", e); - } - Exception exception = this.exception.get(); - Response response = this.response.get(); - if (exception != null) { - if (response != null) { - IllegalStateException e = new IllegalStateException("response and exception are unexpectedly set at the same time"); - e.addSuppressed(exception); - throw e; - } - /* - * Wrap and rethrow whatever exception we received, copying the type - * where possible so the synchronous API looks as much as possible - * like the asynchronous API. We wrap the exception so that the caller's - * signature shows up in any exception we throw. - */ - if (exception instanceof WarningFailureException) { - throw new WarningFailureException((WarningFailureException) exception); - } - if (exception instanceof ResponseException) { - throw new ResponseException((ResponseException) exception); - } - if (exception instanceof ConnectTimeoutException) { - ConnectTimeoutException e = new ConnectTimeoutException(exception.getMessage()); - e.initCause(exception); - throw e; - } - if (exception instanceof SocketTimeoutException) { - SocketTimeoutException e = new SocketTimeoutException(exception.getMessage()); - e.initCause(exception); - throw e; - } - if (exception instanceof ConnectionClosedException) { - ConnectionClosedException e = new ConnectionClosedException(exception.getMessage()); - e.initCause(exception); - throw e; - } - if (exception instanceof SSLHandshakeException) { - SSLHandshakeException e = new SSLHandshakeException(exception.getMessage()); - e.initCause(exception); - throw e; - } - if (exception instanceof ConnectException) { - ConnectException e = new ConnectException(exception.getMessage()); - e.initCause(exception); - throw e; - } - if (exception instanceof IOException) { - throw new IOException(exception.getMessage(), exception); - } - if (exception instanceof RuntimeException){ - throw new RuntimeException(exception.getMessage(), exception); - } - throw new RuntimeException("error while performing request", exception); - } - - if (response == null) { - throw new IllegalStateException("response not set and no exception caught either"); - } - return response; + addSuppressedException(this.exception, exception); + this.exception = exception; } } @@ -783,4 +670,103 @@ public void remove() { itr.remove(); } } + + private class InternalRequest { + private final Request request; + private final Map params; + private final Set ignoreErrorCodes; + private final HttpRequestBase httpRequest; + private final WarningsHandler warningsHandler; + + InternalRequest(Request request) { + this.request = request; + this.params = new HashMap<>(request.getParameters()); + //ignore is a special parameter supported by the clients, shouldn't be sent to es + String ignoreString = params.remove("ignore"); + this.ignoreErrorCodes = getIgnoreErrorCodes(ignoreString, request.getMethod()); + URI uri = buildUri(pathPrefix, request.getEndpoint(), params); + this.httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity()); + setHeaders(httpRequest, request.getOptions().getHeaders()); + this.warningsHandler = request.getOptions().getWarningsHandler() == null ? + RestClient.this.warningsHandler : request.getOptions().getWarningsHandler(); + } + + private void setHeaders(HttpRequest httpRequest, Collection
requestHeaders) { + // request headers override default headers, so we don't add default headers if they exist as request headers + final Set requestNames = new HashSet<>(requestHeaders.size()); + for (Header requestHeader : requestHeaders) { + httpRequest.addHeader(requestHeader); + requestNames.add(requestHeader.getName()); + } + for (Header defaultHeader : defaultHeaders) { + if (requestNames.contains(defaultHeader.getName()) == false) { + httpRequest.addHeader(defaultHeader); + } + } + } + + RequestContext createContextForNextAttempt(Node node, AuthCache authCache) { + this.httpRequest.reset(); + return new RequestContext(this, node, authCache); + } + } + + private static class RequestContext { + private final Node node; + private final HttpAsyncRequestProducer requestProducer; + private final HttpAsyncResponseConsumer asyncResponseConsumer; + private final HttpClientContext context; + + RequestContext(InternalRequest request, Node node, AuthCache authCache) { + this.node = node; + //we stream the request body if the entity allows for it + this.requestProducer = HttpAsyncMethods.create(node.getHost(), request.httpRequest); + this.asyncResponseConsumer = + request.request.getOptions().getHttpAsyncResponseConsumerFactory().createHttpAsyncResponseConsumer(); + this.context = HttpClientContext.create(); + context.setAuthCache(authCache); + } + } + + private static Set getIgnoreErrorCodes(String ignoreString, String requestMethod) { + Set ignoreErrorCodes; + if (ignoreString == null) { + if (HttpHead.METHOD_NAME.equals(requestMethod)) { + //404 never causes error if returned for a HEAD request + ignoreErrorCodes = Collections.singleton(404); + } else { + ignoreErrorCodes = Collections.emptySet(); + } + } else { + String[] ignoresArray = ignoreString.split(","); + ignoreErrorCodes = new HashSet<>(); + if (HttpHead.METHOD_NAME.equals(requestMethod)) { + //404 never causes error if returned for a HEAD request + ignoreErrorCodes.add(404); + } + for (String ignoreCode : ignoresArray) { + try { + ignoreErrorCodes.add(Integer.valueOf(ignoreCode)); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("ignore value should be a number, found [" + ignoreString + "] instead", e); + } + } + } + return ignoreErrorCodes; + } + + private static class InternalResponse { + private final Response response; + private final ResponseException responseException; + + InternalResponse(Response response) { + this.response = Objects.requireNonNull(response); + this.responseException = null; + } + + InternalResponse(ResponseException responseException) { + this.responseException = Objects.requireNonNull(responseException); + this.response = null; + } + } } diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java index 1383a0aaf73ac..b6498ac558a40 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java @@ -47,6 +47,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -87,25 +88,23 @@ public Future answer(InvocationOnMock invocationOnMock) throws Thr final HttpHost httpHost = requestProducer.getTarget(); HttpClientContext context = (HttpClientContext) invocationOnMock.getArguments()[2]; assertThat(context.getAuthCache().get(httpHost), instanceOf(BasicScheme.class)); - final FutureCallback futureCallback = (FutureCallback) invocationOnMock.getArguments()[3]; //return the desired status code or exception depending on the path - exec.execute(new Runnable() { + return exec.submit(new Callable() { @Override - public void run() { + public HttpResponse call() throws Exception { if (request.getURI().getPath().equals("/soe")) { - futureCallback.failed(new SocketTimeoutException(httpHost.toString())); + throw new SocketTimeoutException(httpHost.toString()); } else if (request.getURI().getPath().equals("/coe")) { - futureCallback.failed(new ConnectTimeoutException(httpHost.toString())); + throw new ConnectTimeoutException(httpHost.toString()); } else if (request.getURI().getPath().equals("/ioe")) { - futureCallback.failed(new IOException(httpHost.toString())); + throw new IOException(httpHost.toString()); } else { int statusCode = Integer.parseInt(request.getURI().getPath().substring(1)); StatusLine statusLine = new BasicStatusLine(new ProtocolVersion("http", 1, 1), statusCode, ""); - futureCallback.completed(new BasicHttpResponse(statusLine)); + return new BasicHttpResponse(statusLine); } } }); - return null; } }); int numNodes = RandomNumbers.randomIntBetween(getRandom(), 2, 5); @@ -182,11 +181,6 @@ public void testRoundRobinRetryErrors() throws IOException { restClient.performRequest(new Request(randomHttpMethod(getRandom()), retryEndpoint)); fail("request should have failed"); } catch (ResponseException e) { - /* - * Unwrap the top level failure that was added so the stack trace contains - * the caller. It wraps the exception that contains the failed hosts. - */ - e = (ResponseException) e.getCause(); Set hostsSet = hostsSet(); //first request causes all the hosts to be blacklisted, the returned exception holds one suppressed exception each failureListener.assertCalled(nodes); @@ -206,11 +200,6 @@ public void testRoundRobinRetryErrors() throws IOException { } while(e != null); assertEquals("every host should have been used but some weren't: " + hostsSet, 0, hostsSet.size()); } catch (IOException e) { - /* - * Unwrap the top level failure that was added so the stack trace contains - * the caller. It wraps the exception that contains the failed hosts. - */ - e = (IOException) e.getCause(); Set hostsSet = hostsSet(); //first request causes all the hosts to be blacklisted, the returned exception holds one suppressed exception each failureListener.assertCalled(nodes); @@ -247,11 +236,6 @@ public void testRoundRobinRetryErrors() throws IOException { failureListener.assertCalled(response.getHost()); assertEquals(0, e.getSuppressed().length); } catch (IOException e) { - /* - * Unwrap the top level failure that was added so the stack trace contains - * the caller. It wraps the exception that contains the failed hosts. - */ - e = (IOException) e.getCause(); HttpHost httpHost = HttpHost.create(e.getMessage()); assertTrue("host [" + httpHost + "] not found, most likely used multiple times", hostsSet.remove(httpHost)); //after the first request, all hosts are blacklisted, a single one gets resurrected each time @@ -293,11 +277,6 @@ public void testRoundRobinRetryErrors() throws IOException { assertThat(response.getHost(), equalTo(selectedHost)); failureListener.assertCalled(selectedHost); } catch(IOException e) { - /* - * Unwrap the top level failure that was added so the stack trace contains - * the caller. It wraps the exception that contains the failed hosts. - */ - e = (IOException) e.getCause(); HttpHost httpHost = HttpHost.create(e.getMessage()); assertThat(httpHost, equalTo(selectedHost)); failureListener.assertCalled(selectedHost); diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java index baee2a45febdb..74d1311829b11 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java @@ -56,6 +56,8 @@ import org.mockito.stubbing.Answer; import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; import java.net.SocketTimeoutException; import java.net.URI; import java.util.Arrays; @@ -63,6 +65,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -72,7 +75,6 @@ import static org.elasticsearch.client.RestClientTestUtil.getHttpMethods; import static org.elasticsearch.client.RestClientTestUtil.getOkStatusCodes; import static org.elasticsearch.client.RestClientTestUtil.randomStatusCode; -import static org.elasticsearch.client.SyncResponseListenerTests.assertExceptionStackContainsCallingMethod; import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; @@ -115,14 +117,12 @@ public Future answer(InvocationOnMock invocationOnMock) throws Thr HttpAsyncRequestProducer requestProducer = (HttpAsyncRequestProducer) invocationOnMock.getArguments()[0]; HttpClientContext context = (HttpClientContext) invocationOnMock.getArguments()[2]; assertThat(context.getAuthCache().get(node.getHost()), instanceOf(BasicScheme.class)); - final FutureCallback futureCallback = - (FutureCallback) invocationOnMock.getArguments()[3]; HttpUriRequest request = (HttpUriRequest)requestProducer.generateRequest(); //return the desired status code or exception depending on the path if (request.getURI().getPath().equals("/soe")) { - futureCallback.failed(new SocketTimeoutException()); + throw new SocketTimeoutException(); } else if (request.getURI().getPath().equals("/coe")) { - futureCallback.failed(new ConnectTimeoutException()); + throw new ConnectTimeoutException(); } else { int statusCode = Integer.parseInt(request.getURI().getPath().substring(1)); StatusLine statusLine = new BasicStatusLine(new ProtocolVersion("http", 1, 1), statusCode, ""); @@ -140,14 +140,13 @@ public Future answer(InvocationOnMock invocationOnMock) throws Thr //return the same headers that were sent httpResponse.setHeaders(request.getAllHeaders()); // Call the callback asynchronous to better simulate how async http client works - exec.execute(new Runnable() { + return exec.submit(new Callable() { @Override - public void run() { - futureCallback.completed(httpResponse); + public HttpResponse call() { + return httpResponse; } }); } - return null; } }); @@ -461,7 +460,7 @@ private HttpUriRequest performRandomRequest(String method) throws Exception { //randomly add some ignore parameter, which doesn't get sent as part of the request String ignore = Integer.toString(randomFrom(RestClientTestUtil.getAllErrorStatusCodes())); if (randomBoolean()) { - ignore += "," + Integer.toString(randomFrom(RestClientTestUtil.getAllErrorStatusCodes())); + ignore += "," + randomFrom(RestClientTestUtil.getAllErrorStatusCodes()); } request.addParameter("ignore", ignore); } @@ -528,4 +527,30 @@ private HttpUriRequest performRandomRequest(String method) throws Exception { } return expectedRequest; } + + /** + * Asserts that the provided {@linkplain Exception} contains the method + * that called this somewhere on its stack. This is + * normally the case for synchronous calls but {@link RestClient} performs + * synchronous calls by performing asynchronous calls and blocking the + * current thread until the call returns so it has to take special care + * to make sure that the caller shows up in the exception. We use this + * assertion to make sure that we don't break that "special care". + */ + //TODO do we still need this? + private static void assertExceptionStackContainsCallingMethod(Exception e) { + // 0 is getStackTrace + // 1 is this method + // 2 is the caller, what we want + StackTraceElement myMethod = Thread.currentThread().getStackTrace()[2]; + for (StackTraceElement se : e.getStackTrace()) { + if (se.getClassName().equals(myMethod.getClassName()) + && se.getMethodName().equals(myMethod.getMethodName())) { + return; + } + } + StringWriter stack = new StringWriter(); + e.printStackTrace(new PrintWriter(stack)); + fail("didn't find the calling method (looks like " + myMethod + ") in:\n" + stack); + } } diff --git a/client/rest/src/test/java/org/elasticsearch/client/SyncResponseListenerTests.java b/client/rest/src/test/java/org/elasticsearch/client/SyncResponseListenerTests.java deleted file mode 100644 index e85417f2b6131..0000000000000 --- a/client/rest/src/test/java/org/elasticsearch/client/SyncResponseListenerTests.java +++ /dev/null @@ -1,273 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.client; - -import org.apache.http.ConnectionClosedException; -import org.apache.http.HttpHost; -import org.apache.http.HttpResponse; -import org.apache.http.ProtocolVersion; -import org.apache.http.RequestLine; -import org.apache.http.StatusLine; -import org.apache.http.conn.ConnectTimeoutException; -import org.apache.http.message.BasicHttpResponse; -import org.apache.http.message.BasicRequestLine; -import org.apache.http.message.BasicStatusLine; - -import javax.net.ssl.SSLHandshakeException; -import java.io.IOException; -import java.io.PrintWriter; -import java.io.StringWriter; -import java.net.SocketTimeoutException; -import java.net.URISyntaxException; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertSame; -import static org.junit.Assert.fail; - -public class SyncResponseListenerTests extends RestClientTestCase { - /** - * Asserts that the provided {@linkplain Exception} contains the method - * that called this somewhere on its stack. This is - * normally the case for synchronous calls but {@link RestClient} performs - * synchronous calls by performing asynchronous calls and blocking the - * current thread until the call returns so it has to take special care - * to make sure that the caller shows up in the exception. We use this - * assertion to make sure that we don't break that "special care". - */ - static void assertExceptionStackContainsCallingMethod(Exception e) { - // 0 is getStackTrace - // 1 is this method - // 2 is the caller, what we want - StackTraceElement myMethod = Thread.currentThread().getStackTrace()[2]; - for (StackTraceElement se : e.getStackTrace()) { - if (se.getClassName().equals(myMethod.getClassName()) - && se.getMethodName().equals(myMethod.getMethodName())) { - return; - } - } - StringWriter stack = new StringWriter(); - e.printStackTrace(new PrintWriter(stack)); - fail("didn't find the calling method (looks like " + myMethod + ") in:\n" + stack); - } - - public void testOnSuccessNullResponse() { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(); - try { - syncResponseListener.onSuccess(null); - fail("onSuccess should have failed"); - } catch (NullPointerException e) { - assertEquals("response must not be null", e.getMessage()); - } - } - - public void testOnFailureNullException() { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(); - try { - syncResponseListener.onFailure(null); - fail("onFailure should have failed"); - } catch (NullPointerException e) { - assertEquals("exception must not be null", e.getMessage()); - } - } - - public void testOnSuccess() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(); - Response mockResponse = mockResponse(); - syncResponseListener.onSuccess(mockResponse); - Response response = syncResponseListener.get(); - assertSame(response, mockResponse); - - try { - syncResponseListener.onSuccess(mockResponse); - fail("get should have failed"); - } catch (IllegalStateException e) { - assertEquals(e.getMessage(), "response is already set"); - } - response = syncResponseListener.get(); - assertSame(response, mockResponse); - } - - public void testOnFailure() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(); - RuntimeException firstException = new RuntimeException("first-test"); - syncResponseListener.onFailure(firstException); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (RuntimeException e) { - assertEquals(firstException.getMessage(), e.getMessage()); - assertSame(firstException, e.getCause()); - } - - RuntimeException secondException = new RuntimeException("second-test"); - try { - syncResponseListener.onFailure(secondException); - } catch(IllegalStateException e) { - assertEquals(e.getMessage(), "exception is already set"); - } - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (RuntimeException e) { - assertEquals(firstException.getMessage(), e.getMessage()); - assertSame(firstException, e.getCause()); - } - - Response response = mockResponse(); - syncResponseListener.onSuccess(response); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (IllegalStateException e) { - assertEquals("response and exception are unexpectedly set at the same time", e.getMessage()); - assertNotNull(e.getSuppressed()); - assertEquals(1, e.getSuppressed().length); - assertSame(firstException, e.getSuppressed()[0]); - } - } - - public void testRuntimeIsBuiltCorrectly() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(); - RuntimeException runtimeException = new RuntimeException(); - syncResponseListener.onFailure(runtimeException); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (RuntimeException e) { - // We preserve the original exception in the cause - assertSame(runtimeException, e.getCause()); - // We copy the message - assertEquals(runtimeException.getMessage(), e.getMessage()); - // And we do all that so the thrown exception has our method in the stacktrace - assertExceptionStackContainsCallingMethod(e); - } - } - - public void testConnectTimeoutExceptionIsBuiltCorrectly() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(); - ConnectTimeoutException timeoutException = new ConnectTimeoutException(); - syncResponseListener.onFailure(timeoutException); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (IOException e) { - // We preserve the original exception in the cause - assertSame(timeoutException, e.getCause()); - // We copy the message - assertEquals(timeoutException.getMessage(), e.getMessage()); - // And we do all that so the thrown exception has our method in the stacktrace - assertExceptionStackContainsCallingMethod(e); - } - } - - public void testSocketTimeoutExceptionIsBuiltCorrectly() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(); - SocketTimeoutException timeoutException = new SocketTimeoutException(); - syncResponseListener.onFailure(timeoutException); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (IOException e) { - // We preserve the original exception in the cause - assertSame(timeoutException, e.getCause()); - // We copy the message - assertEquals(timeoutException.getMessage(), e.getMessage()); - // And we do all that so the thrown exception has our method in the stacktrace - assertExceptionStackContainsCallingMethod(e); - } - } - - public void testConnectionClosedExceptionIsWrapped() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(); - ConnectionClosedException closedException = new ConnectionClosedException(randomAsciiAlphanumOfLength(5)); - syncResponseListener.onFailure(closedException); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (ConnectionClosedException e) { - // We preserve the original exception in the cause - assertSame(closedException, e.getCause()); - // We copy the message - assertEquals(closedException.getMessage(), e.getMessage()); - // And we do all that so the thrown exception has our method in the stacktrace - assertExceptionStackContainsCallingMethod(e); - } - } - - public void testSSLHandshakeExceptionIsWrapped() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(); - SSLHandshakeException exception = new SSLHandshakeException(randomAsciiAlphanumOfLength(5)); - syncResponseListener.onFailure(exception); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (SSLHandshakeException e) { - // We preserve the original exception in the cause - assertSame(exception, e.getCause()); - // We copy the message - assertEquals(exception.getMessage(), e.getMessage()); - // And we do all that so the thrown exception has our method in the stacktrace - assertExceptionStackContainsCallingMethod(e); - } - } - - public void testIOExceptionIsBuiltCorrectly() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(); - IOException ioException = new IOException(); - syncResponseListener.onFailure(ioException); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (IOException e) { - // We preserve the original exception in the cause - assertSame(ioException, e.getCause()); - // We copy the message - assertEquals(ioException.getMessage(), e.getMessage()); - // And we do all that so the thrown exception has our method in the stacktrace - assertExceptionStackContainsCallingMethod(e); - } - } - - public void testExceptionIsWrapped() throws Exception { - RestClient.SyncResponseListener syncResponseListener = new RestClient.SyncResponseListener(); - //we just need any checked exception - URISyntaxException exception = new URISyntaxException("test", "test"); - syncResponseListener.onFailure(exception); - try { - syncResponseListener.get(); - fail("get should have failed"); - } catch (RuntimeException e) { - assertEquals("error while performing request", e.getMessage()); - // We preserve the original exception in the cause - assertSame(exception, e.getCause()); - // And we do all that so the thrown exception has our method in the stacktrace - assertExceptionStackContainsCallingMethod(e); - } - } - - private static Response mockResponse() { - ProtocolVersion protocolVersion = new ProtocolVersion("HTTP", 1, 1); - RequestLine requestLine = new BasicRequestLine("GET", "/", protocolVersion); - StatusLine statusLine = new BasicStatusLine(protocolVersion, 200, "OK"); - HttpResponse httpResponse = new BasicHttpResponse(statusLine); - return new Response(requestLine, new HttpHost("localhost", 9200), httpResponse); - } -} From 2a2e92da873aa062e376aed1e67b28fc6c827b75 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Mon, 4 Feb 2019 18:57:54 +0100 Subject: [PATCH 06/13] clarify migrate note --- docs/reference/migration/migrate_7_0/restclient.asciidoc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/reference/migration/migrate_7_0/restclient.asciidoc b/docs/reference/migration/migrate_7_0/restclient.asciidoc index 0f9acbb471004..39d19c345cd95 100644 --- a/docs/reference/migration/migrate_7_0/restclient.asciidoc +++ b/docs/reference/migration/migrate_7_0/restclient.asciidoc @@ -28,4 +28,5 @@ value has been aligned with the Elasticsearch default level: `cluster`. ==== Support for `maxRetryTimeout` removed from RestClient `RestClient` and `RestClientBuilder` no longer support the `maxRetryTimeout` -setting. \ No newline at end of file +setting. The setting was removed as its counting mechanism was not accurate +and caused issues while adding little value. \ No newline at end of file From 9c26d705ec6e04ede4a6542745a45481b0b76c1f Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Mon, 4 Feb 2019 18:58:09 +0100 Subject: [PATCH 07/13] rename internal response --- .../org/elasticsearch/client/RestClient.java | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java index bdfaf37077ede..7873709c926a6 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java +++ b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java @@ -217,7 +217,7 @@ private Response performRequest(final NodeTuple> nodeTuple, if (nodeTuple.nodes.hasNext()) { return performRequest(nodeTuple, request, cause); } - if (cause instanceof IOException) { + if (cause instanceof IOException) { throw (IOException)cause; } if (cause instanceof RuntimeException) { @@ -225,15 +225,15 @@ private Response performRequest(final NodeTuple> nodeTuple, } throw new RuntimeException(cause); } - InternalResponse internalResponse = onResponse(request, context.node, httpResponse); - if (internalResponse.responseException == null) { - return internalResponse.response; + ResponseOrException responseOrException = onResponse(request, context.node, httpResponse); + if (responseOrException.responseException == null) { + return responseOrException.response; } - addSuppressedException(previousException, internalResponse.responseException); + addSuppressedException(previousException, responseOrException.responseException); if (nodeTuple.nodes.hasNext()) { - return performRequest(nodeTuple, request, internalResponse.responseException); + return performRequest(nodeTuple, request, responseOrException.responseException); } else { - throw internalResponse.responseException; + throw responseOrException.responseException; } } @@ -249,7 +249,7 @@ private static Exception unwrapExecutionException(Exception e) { return e; } - private InternalResponse onResponse(InternalRequest request, Node node, HttpResponse httpResponse) throws IOException { + private ResponseOrException onResponse(InternalRequest request, Node node, HttpResponse httpResponse) throws IOException { RequestLogger.logResponse(logger, request.httpRequest, node.getHost(), httpResponse); int statusCode = httpResponse.getStatusLine().getStatusCode(); Response response = new Response(request.httpRequest.getRequestLine(), node.getHost(), httpResponse); @@ -258,14 +258,14 @@ private InternalResponse onResponse(InternalRequest request, Node node, HttpResp if (request.warningsHandler.warningsShouldFailRequest(response.getWarnings())) { throw new WarningFailureException(response); } else { - return new InternalResponse(response); + return new ResponseOrException(response); } } else { ResponseException responseException = new ResponseException(response); if (isRetryStatus(statusCode)) { //mark host dead and retry against next one onFailure(node); - return new InternalResponse(responseException); + return new ResponseOrException(responseException); } else { //mark host alive and don't retry, as the error should be a request problem onResponse(node); @@ -308,15 +308,15 @@ private void performRequestAsync(final NodeTuple> nodeTuple, @Override public void completed(HttpResponse httpResponse) { try { - InternalResponse internalResponse = onResponse(request, context.node, httpResponse); - if (internalResponse.responseException == null) { - listener.onSuccess(internalResponse.response); + ResponseOrException responseOrException = onResponse(request, context.node, httpResponse); + if (responseOrException.responseException == null) { + listener.onSuccess(responseOrException.response); } else { if (nodeTuple.nodes.hasNext()) { - listener.trackFailure(internalResponse.responseException); + listener.trackFailure(responseOrException.responseException); performRequestAsync(nodeTuple, request, listener); } else { - listener.onDefinitiveFailure(internalResponse.responseException); + listener.onDefinitiveFailure(responseOrException.responseException); } } } catch(Exception e) { @@ -755,16 +755,16 @@ private static Set getIgnoreErrorCodes(String ignoreString, String requ return ignoreErrorCodes; } - private static class InternalResponse { + private static class ResponseOrException { private final Response response; private final ResponseException responseException; - InternalResponse(Response response) { + ResponseOrException(Response response) { this.response = Objects.requireNonNull(response); this.responseException = null; } - InternalResponse(ResponseException responseException) { + ResponseOrException(ResponseException responseException) { this.responseException = Objects.requireNonNull(responseException); this.response = null; } From d938c034c3fe8f35bcffeede9098a78edbbb23ae Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Mon, 4 Feb 2019 20:25:33 +0100 Subject: [PATCH 08/13] expand tests for other exceptions --- .../client/RestClientSingleHostTests.java | 71 +++++++++++++++++-- 1 file changed, 66 insertions(+), 5 deletions(-) diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java index 74d1311829b11..7a3a724a1e8d9 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java @@ -21,6 +21,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.http.ConnectionClosedException; import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpEntityEnclosingRequest; @@ -55,11 +56,13 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import javax.net.ssl.SSLHandshakeException; import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; import java.net.SocketTimeoutException; import java.net.URI; +import java.net.URISyntaxException; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -81,6 +84,7 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -123,6 +127,16 @@ public Future answer(InvocationOnMock invocationOnMock) throws Thr throw new SocketTimeoutException(); } else if (request.getURI().getPath().equals("/coe")) { throw new ConnectTimeoutException(); + } else if (request.getURI().getPath().equals("/ioe")) { + throw new IOException(); + } else if (request.getURI().getPath().equals("/closed")) { + throw new ConnectionClosedException(); + } else if (request.getURI().getPath().equals("/handshake")) { + throw new SSLHandshakeException(""); + } else if (request.getURI().getPath().equals("/uri")) { + throw new URISyntaxException("", ""); + } else if (request.getURI().getPath().equals("/runtime")) { + throw new RuntimeException(); } else { int statusCode = Integer.parseInt(request.getURI().getPath().substring(1)); StatusLine statusLine = new BasicStatusLine(new ProtocolVersion("http", 1, 1), statusCode, ""); @@ -255,14 +269,24 @@ public void testErrorStatusCodes() throws IOException { } } - public void testIOExceptions() { + public void testExceptions() throws IOException { for (String method : getHttpMethods()) { //IOExceptions should be let bubble up + try { + restClient.performRequest(new Request(method, "/ioe")); + fail("request should have failed"); + } catch(IOException e) { + // And we do all that so the thrown exception has our method in the stacktrace + assertExceptionStackContainsCallingMethod(e); + } + failureListener.assertCalled(singletonList(node)); try { restClient.performRequest(new Request(method, "/coe")); fail("request should have failed"); } catch(IOException e) { assertThat(e, instanceOf(ConnectTimeoutException.class)); + // And we do all that so the thrown exception has our method in the stacktrace + assertExceptionStackContainsCallingMethod(e); } failureListener.assertCalled(singletonList(node)); try { @@ -270,6 +294,44 @@ public void testIOExceptions() { fail("request should have failed"); } catch(IOException e) { assertThat(e, instanceOf(SocketTimeoutException.class)); + // And we do all that so the thrown exception has our method in the stacktrace + assertExceptionStackContainsCallingMethod(e); + } + failureListener.assertCalled(singletonList(node)); + try { + restClient.performRequest(new Request(method, "/closed")); + fail("request should have failed"); + } catch(IOException e) { + assertThat(e, instanceOf(ConnectionClosedException.class)); + // And we do all that so the thrown exception has our method in the stacktrace + assertExceptionStackContainsCallingMethod(e); + } + failureListener.assertCalled(singletonList(node)); + try { + restClient.performRequest(new Request(method, "/handshake")); + fail("request should have failed"); + } catch(IOException e) { + assertThat(e, instanceOf(SSLHandshakeException.class)); + // And we do all that so the thrown exception has our method in the stacktrace + assertExceptionStackContainsCallingMethod(e); + } + failureListener.assertCalled(singletonList(node)); + try { + restClient.performRequest(new Request(method, "/uri")); + fail("request should have failed"); + } catch (RuntimeException e) { + assertThat(e.getCause(), instanceOf(URISyntaxException.class)); + // And we do all that so the thrown exception has our method in the stacktrace + assertExceptionStackContainsCallingMethod(e); + } + failureListener.assertCalled(singletonList(node)); + try { + restClient.performRequest(new Request(method, "/runtime")); + fail("request should have failed"); + } catch (RuntimeException e) { + assertNull(e.getCause()); + // And we do all that so the thrown exception has our method in the stacktrace + assertExceptionStackContainsCallingMethod(e); } failureListener.assertCalled(singletonList(node)); } @@ -537,20 +599,19 @@ private HttpUriRequest performRandomRequest(String method) throws Exception { * to make sure that the caller shows up in the exception. We use this * assertion to make sure that we don't break that "special care". */ - //TODO do we still need this? - private static void assertExceptionStackContainsCallingMethod(Exception e) { + private static void assertExceptionStackContainsCallingMethod(Throwable t) { // 0 is getStackTrace // 1 is this method // 2 is the caller, what we want StackTraceElement myMethod = Thread.currentThread().getStackTrace()[2]; - for (StackTraceElement se : e.getStackTrace()) { + for (StackTraceElement se : t.getStackTrace()) { if (se.getClassName().equals(myMethod.getClassName()) && se.getMethodName().equals(myMethod.getMethodName())) { return; } } StringWriter stack = new StringWriter(); - e.printStackTrace(new PrintWriter(stack)); + t.printStackTrace(new PrintWriter(stack)); fail("didn't find the calling method (looks like " + myMethod + ") in:\n" + stack); } } From 7dc26cc261bc57b0b4ea72e897af1cbda8221051 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Tue, 5 Feb 2019 10:49:48 +0100 Subject: [PATCH 09/13] adapt failing test --- .../java/org/elasticsearch/rest/discovery/Zen2RestApiIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/transport-netty4/src/test/java/org/elasticsearch/rest/discovery/Zen2RestApiIT.java b/modules/transport-netty4/src/test/java/org/elasticsearch/rest/discovery/Zen2RestApiIT.java index f26b02696e7e5..bad541384675d 100644 --- a/modules/transport-netty4/src/test/java/org/elasticsearch/rest/discovery/Zen2RestApiIT.java +++ b/modules/transport-netty4/src/test/java/org/elasticsearch/rest/discovery/Zen2RestApiIT.java @@ -158,7 +158,7 @@ public void testFailsOnUnknownNode() throws Exception { } catch (ResponseException e) { assertThat(e.getResponse().getStatusLine().getStatusCode(), is(400)); assertThat( - e.getCause().getMessage(), + e.getMessage(), Matchers.containsString("add voting config exclusions request for [invalid] matched no master-eligible nodes") ); } From ef67306c2a59a632b710475655554da064728d99 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Tue, 5 Feb 2019 13:32:15 +0100 Subject: [PATCH 10/13] reintroduce exception wrapping and expand tests --- .../org/elasticsearch/client/RestClient.java | 133 +++++++---- .../RestClientMultipleHostsIntegTests.java | 6 +- .../client/RestClientMultipleHostsTests.java | 82 ++----- .../RestClientSingleHostIntegTests.java | 46 ++-- .../client/RestClientSingleHostTests.java | 213 +++++++++++------- 5 files changed, 265 insertions(+), 215 deletions(-) diff --git a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java index 7873709c926a6..5e63cdd0776ab 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java +++ b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java @@ -20,6 +20,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.http.ConnectionClosedException; import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; @@ -38,6 +39,7 @@ import org.apache.http.client.protocol.HttpClientContext; import org.apache.http.client.utils.URIBuilder; import org.apache.http.concurrent.FutureCallback; +import org.apache.http.conn.ConnectTimeoutException; import org.apache.http.impl.auth.BasicScheme; import org.apache.http.impl.client.BasicAuthCache; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; @@ -46,8 +48,11 @@ import org.apache.http.nio.protocol.HttpAsyncResponseConsumer; import org.elasticsearch.client.DeadHostState.TimeSupplier; +import javax.net.ssl.SSLHandshakeException; import java.io.Closeable; import java.io.IOException; +import java.net.ConnectException; +import java.net.SocketTimeoutException; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -189,7 +194,12 @@ public List getNodes() { * of them does, in which case an {@link IOException} will be thrown. * * This method works by performing an asynchronous call and waiting - * for its result. + * for the result. If the asynchronous call throws an exception we wrap + * it and rethrow it so that the stack trace attached to the exception + * contains the call site. While we attempt to preserve the original + * exception this isn't always possible and likely haven't covered all of + * the cases. You can get the original exception from + * {@link Exception#getCause()}. * * @param request the request to perform * @return the response returned by Elasticsearch @@ -212,44 +222,31 @@ private Response performRequest(final NodeTuple> nodeTuple, } catch(Exception e) { RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, e); onFailure(context.node); - Exception cause = unwrapExecutionException(e); + Exception cause = extractAndWrapCause(e); addSuppressedException(previousException, cause); if (nodeTuple.nodes.hasNext()) { return performRequest(nodeTuple, request, cause); } if (cause instanceof IOException) { - throw (IOException)cause; + throw (IOException) cause; } if (cause instanceof RuntimeException) { - throw (RuntimeException)cause; + throw (RuntimeException) cause; } - throw new RuntimeException(cause); + throw new IllegalStateException("cause must be either RuntimeException or IOException", cause); } - ResponseOrException responseOrException = onResponse(request, context.node, httpResponse); - if (responseOrException.responseException == null) { - return responseOrException.response; + ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse); + if (responseOrResponseException.responseException == null) { + return responseOrResponseException.response; } - addSuppressedException(previousException, responseOrException.responseException); + addSuppressedException(previousException, responseOrResponseException.responseException); if (nodeTuple.nodes.hasNext()) { - return performRequest(nodeTuple, request, responseOrException.responseException); - } else { - throw responseOrException.responseException; - } - } - - private static Exception unwrapExecutionException(Exception e) { - if (e instanceof ExecutionException) { - ExecutionException executionException = (ExecutionException)e; - Throwable t = executionException.getCause() == null ? executionException : executionException.getCause(); - if (t instanceof Error) { - throw (Error)t; - } - return (Exception)t; + return performRequest(nodeTuple, request, responseOrResponseException.responseException); } - return e; + throw responseOrResponseException.responseException; } - private ResponseOrException onResponse(InternalRequest request, Node node, HttpResponse httpResponse) throws IOException { + private ResponseOrResponseException convertResponse(InternalRequest request, Node node, HttpResponse httpResponse) throws IOException { RequestLogger.logResponse(logger, request.httpRequest, node.getHost(), httpResponse); int statusCode = httpResponse.getStatusLine().getStatusCode(); Response response = new Response(request.httpRequest.getRequestLine(), node.getHost(), httpResponse); @@ -257,21 +254,18 @@ private ResponseOrException onResponse(InternalRequest request, Node node, HttpR onResponse(node); if (request.warningsHandler.warningsShouldFailRequest(response.getWarnings())) { throw new WarningFailureException(response); - } else { - return new ResponseOrException(response); - } - } else { - ResponseException responseException = new ResponseException(response); - if (isRetryStatus(statusCode)) { - //mark host dead and retry against next one - onFailure(node); - return new ResponseOrException(responseException); - } else { - //mark host alive and don't retry, as the error should be a request problem - onResponse(node); - throw responseException; } + return new ResponseOrResponseException(response); } + ResponseException responseException = new ResponseException(response); + if (isRetryStatus(statusCode)) { + //mark host dead and retry against next one + onFailure(node); + return new ResponseOrResponseException(responseException); + } + //mark host alive and don't retry, as the error should be a request problem + onResponse(node); + throw responseException; } /** @@ -308,15 +302,15 @@ private void performRequestAsync(final NodeTuple> nodeTuple, @Override public void completed(HttpResponse httpResponse) { try { - ResponseOrException responseOrException = onResponse(request, context.node, httpResponse); - if (responseOrException.responseException == null) { - listener.onSuccess(responseOrException.response); + ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse); + if (responseOrResponseException.responseException == null) { + listener.onSuccess(responseOrResponseException.response); } else { if (nodeTuple.nodes.hasNext()) { - listener.trackFailure(responseOrException.responseException); + listener.trackFailure(responseOrResponseException.responseException); performRequestAsync(nodeTuple, request, listener); } else { - listener.onDefinitiveFailure(responseOrException.responseException); + listener.onDefinitiveFailure(responseOrResponseException.responseException); } } } catch(Exception e) { @@ -755,18 +749,65 @@ private static Set getIgnoreErrorCodes(String ignoreString, String requ return ignoreErrorCodes; } - private static class ResponseOrException { + private static class ResponseOrResponseException { private final Response response; private final ResponseException responseException; - ResponseOrException(Response response) { + ResponseOrResponseException(Response response) { this.response = Objects.requireNonNull(response); this.responseException = null; } - ResponseOrException(ResponseException responseException) { + ResponseOrResponseException(ResponseException responseException) { this.responseException = Objects.requireNonNull(responseException); this.response = null; } } + + /** + * Wrap whatever exception we received, copying the type where possible so the synchronous API looks as much as possible + * like the asynchronous API. We wrap the exception so that the caller's signature shows up in any exception we throw. + */ + private static Exception extractAndWrapCause(Exception exception) { + if (exception instanceof ExecutionException) { + ExecutionException executionException = (ExecutionException)exception; + Throwable t = executionException.getCause() == null ? executionException : executionException.getCause(); + if (t instanceof Error) { + throw (Error)t; + } + exception = (Exception)t; + } + if (exception instanceof ConnectTimeoutException) { + ConnectTimeoutException e = new ConnectTimeoutException(exception.getMessage()); + e.initCause(exception); + return e; + } + if (exception instanceof SocketTimeoutException) { + SocketTimeoutException e = new SocketTimeoutException(exception.getMessage()); + e.initCause(exception); + return e; + } + if (exception instanceof ConnectionClosedException) { + ConnectionClosedException e = new ConnectionClosedException(exception.getMessage()); + e.initCause(exception); + return e; + } + if (exception instanceof SSLHandshakeException) { + SSLHandshakeException e = new SSLHandshakeException(exception.getMessage()); + e.initCause(exception); + return e; + } + if (exception instanceof ConnectException) { + ConnectException e = new ConnectException(exception.getMessage()); + e.initCause(exception); + return e; + } + if (exception instanceof IOException) { + return new IOException(exception.getMessage(), exception); + } + if (exception instanceof RuntimeException){ + return new RuntimeException(exception.getMessage(), exception); + } + return new RuntimeException("error while performing request", exception); + } } diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsIntegTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsIntegTests.java index 272859e8441e3..4cc16c45bab2f 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsIntegTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsIntegTests.java @@ -199,7 +199,7 @@ public void onFailure(Exception exception) { * Test host selector against a real server and * test what happens after calling */ - public void testNodeSelector() throws IOException { + public void testNodeSelector() throws Exception { try (RestClient restClient = buildRestClient(firstPositionNodeSelector())) { Request request = new Request("GET", "/200"); int rounds = between(1, 10); @@ -210,7 +210,7 @@ public void testNodeSelector() throws IOException { */ if (stoppedFirstHost) { try { - restClient.performRequest(request); + RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); fail("expected to fail to connect"); } catch (ConnectException e) { // Windows isn't consistent here. Sometimes the message is even null! @@ -219,7 +219,7 @@ public void testNodeSelector() throws IOException { } } } else { - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(httpHosts[0], response.getHost()); } } diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java index b6498ac558a40..f3df9bf3bfd37 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java @@ -22,35 +22,18 @@ import com.carrotsearch.randomizedtesting.generators.RandomNumbers; import org.apache.http.Header; import org.apache.http.HttpHost; -import org.apache.http.HttpResponse; -import org.apache.http.ProtocolVersion; -import org.apache.http.StatusLine; -import org.apache.http.client.methods.HttpUriRequest; -import org.apache.http.client.protocol.HttpClientContext; -import org.apache.http.concurrent.FutureCallback; -import org.apache.http.conn.ConnectTimeoutException; -import org.apache.http.impl.auth.BasicScheme; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; -import org.apache.http.message.BasicHttpResponse; -import org.apache.http.message.BasicStatusLine; -import org.apache.http.nio.protocol.HttpAsyncRequestProducer; -import org.apache.http.nio.protocol.HttpAsyncResponseConsumer; import org.junit.After; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.io.IOException; -import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import static org.elasticsearch.client.RestClientTestUtil.randomErrorNoRetryStatusCode; import static org.elasticsearch.client.RestClientTestUtil.randomErrorRetryStatusCode; @@ -62,9 +45,6 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; /** * Tests for {@link RestClient} behaviour against multiple hosts: fail-over, blacklisting etc. @@ -76,37 +56,8 @@ public class RestClientMultipleHostsTests extends RestClientTestCase { private List nodes; private HostsTrackingFailureListener failureListener; - @SuppressWarnings("unchecked") public RestClient createRestClient(NodeSelector nodeSelector) { - CloseableHttpAsyncClient httpClient = mock(CloseableHttpAsyncClient.class); - when(httpClient.execute(any(HttpAsyncRequestProducer.class), any(HttpAsyncResponseConsumer.class), - any(HttpClientContext.class), any(FutureCallback.class))).thenAnswer(new Answer>() { - @Override - public Future answer(InvocationOnMock invocationOnMock) throws Throwable { - HttpAsyncRequestProducer requestProducer = (HttpAsyncRequestProducer) invocationOnMock.getArguments()[0]; - final HttpUriRequest request = (HttpUriRequest)requestProducer.generateRequest(); - final HttpHost httpHost = requestProducer.getTarget(); - HttpClientContext context = (HttpClientContext) invocationOnMock.getArguments()[2]; - assertThat(context.getAuthCache().get(httpHost), instanceOf(BasicScheme.class)); - //return the desired status code or exception depending on the path - return exec.submit(new Callable() { - @Override - public HttpResponse call() throws Exception { - if (request.getURI().getPath().equals("/soe")) { - throw new SocketTimeoutException(httpHost.toString()); - } else if (request.getURI().getPath().equals("/coe")) { - throw new ConnectTimeoutException(httpHost.toString()); - } else if (request.getURI().getPath().equals("/ioe")) { - throw new IOException(httpHost.toString()); - } else { - int statusCode = Integer.parseInt(request.getURI().getPath().substring(1)); - StatusLine statusLine = new BasicStatusLine(new ProtocolVersion("http", 1, 1), statusCode, ""); - return new BasicHttpResponse(statusLine); - } - } - }); - } - }); + CloseableHttpAsyncClient httpClient = RestClientSingleHostTests.mockHttpClient(exec); int numNodes = RandomNumbers.randomIntBetween(getRandom(), 2, 5); nodes = new ArrayList<>(numNodes); for (int i = 0; i < numNodes; i++) { @@ -125,14 +76,15 @@ public void shutdownExec() { exec.shutdown(); } - public void testRoundRobinOkStatusCodes() throws IOException { + public void testRoundRobinOkStatusCodes() throws Exception { RestClient restClient = createRestClient(NodeSelector.ANY); int numIters = RandomNumbers.randomIntBetween(getRandom(), 1, 5); for (int i = 0; i < numIters; i++) { Set hostsSet = hostsSet(); for (int j = 0; j < nodes.size(); j++) { int statusCode = randomOkStatusCode(getRandom()); - Response response = restClient.performRequest(new Request(randomHttpMethod(getRandom()), "/" + statusCode)); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, + new Request(randomHttpMethod(getRandom()), "/" + statusCode)); assertEquals(statusCode, response.getStatusLine().getStatusCode()); assertTrue("host not found: " + response.getHost(), hostsSet.remove(response.getHost())); } @@ -141,7 +93,7 @@ public void testRoundRobinOkStatusCodes() throws IOException { failureListener.assertNotCalled(); } - public void testRoundRobinNoRetryErrors() throws IOException { + public void testRoundRobinNoRetryErrors() throws Exception { RestClient restClient = createRestClient(NodeSelector.ANY); int numIters = RandomNumbers.randomIntBetween(getRandom(), 1, 5); for (int i = 0; i < numIters; i++) { @@ -150,7 +102,8 @@ public void testRoundRobinNoRetryErrors() throws IOException { String method = randomHttpMethod(getRandom()); int statusCode = randomErrorNoRetryStatusCode(getRandom()); try { - Response response = restClient.performRequest(new Request(method, "/" + statusCode)); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, + new Request(method, "/" + statusCode)); if (method.equals("HEAD") && statusCode == 404) { //no exception gets thrown although we got a 404 assertEquals(404, response.getStatusLine().getStatusCode()); @@ -174,11 +127,11 @@ public void testRoundRobinNoRetryErrors() throws IOException { failureListener.assertNotCalled(); } - public void testRoundRobinRetryErrors() throws IOException { + public void testRoundRobinRetryErrors() throws Exception { RestClient restClient = createRestClient(NodeSelector.ANY); String retryEndpoint = randomErrorRetryEndpoint(); try { - restClient.performRequest(new Request(randomHttpMethod(getRandom()), retryEndpoint)); + RestClientSingleHostTests.performRequestSyncOrAsync(restClient, new Request(randomHttpMethod(getRandom()), retryEndpoint)); fail("request should have failed"); } catch (ResponseException e) { Set hostsSet = hostsSet(); @@ -225,7 +178,8 @@ public void testRoundRobinRetryErrors() throws IOException { for (int j = 0; j < nodes.size(); j++) { retryEndpoint = randomErrorRetryEndpoint(); try { - restClient.performRequest(new Request(randomHttpMethod(getRandom()), retryEndpoint)); + RestClientSingleHostTests.performRequestSyncOrAsync(restClient, + new Request(randomHttpMethod(getRandom()), retryEndpoint)); fail("request should have failed"); } catch (ResponseException e) { Response response = e.getResponse(); @@ -252,7 +206,8 @@ public void testRoundRobinRetryErrors() throws IOException { int statusCode = randomErrorNoRetryStatusCode(getRandom()); Response response; try { - response = restClient.performRequest(new Request(randomHttpMethod(getRandom()), "/" + statusCode)); + response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, + new Request(randomHttpMethod(getRandom()), "/" + statusCode)); } catch (ResponseException e) { response = e.getResponse(); } @@ -269,7 +224,8 @@ public void testRoundRobinRetryErrors() throws IOException { for (int y = 0; y < i + 1; y++) { retryEndpoint = randomErrorRetryEndpoint(); try { - restClient.performRequest(new Request(randomHttpMethod(getRandom()), retryEndpoint)); + RestClientSingleHostTests.performRequestSyncOrAsync(restClient, + new Request(randomHttpMethod(getRandom()), retryEndpoint)); fail("request should have failed"); } catch (ResponseException e) { Response response = e.getResponse(); @@ -286,7 +242,7 @@ public void testRoundRobinRetryErrors() throws IOException { } } - public void testNodeSelector() throws IOException { + public void testNodeSelector() throws Exception { NodeSelector firstPositionOnly = new NodeSelector() { @Override public void select(Iterable restClientNodes) { @@ -309,12 +265,12 @@ public void select(Iterable restClientNodes) { * NodeSelector overrides the round robin behavior. */ Request request = new Request("GET", "/200"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(nodes.get(0).getHost(), response.getHost()); } } - public void testSetNodes() throws IOException { + public void testSetNodes() throws Exception { RestClient restClient = createRestClient(NodeSelector.SKIP_DEDICATED_MASTERS); List newNodes = new ArrayList<>(nodes.size()); for (int i = 0; i < nodes.size(); i++) { @@ -329,7 +285,7 @@ public void testSetNodes() throws IOException { * NodeSelector overrides the round robin behavior. */ Request request = new Request("GET", "/200"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(newNodes.get(0).getHost(), response.getHost()); } } diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostIntegTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostIntegTests.java index fb58f18d42af0..e3fd3c311378b 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostIntegTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostIntegTests.java @@ -206,7 +206,7 @@ public void onFailure(Exception exception) { * to set/add headers to the {@link org.apache.http.client.HttpClient}. * Exercises the test http server ability to send back whatever headers it received. */ - public void testHeaders() throws IOException { + public void testHeaders() throws Exception { for (String method : getHttpMethods()) { final Set standardHeaders = new HashSet<>(Arrays.asList("Connection", "Host", "User-agent", "Date")); if (method.equals("HEAD") == false) { @@ -222,7 +222,7 @@ public void testHeaders() throws IOException { request.setOptions(options); Response esResponse; try { - esResponse = restClient.performRequest(request); + esResponse = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); } catch (ResponseException e) { esResponse = e.getResponse(); } @@ -246,7 +246,7 @@ public void testHeaders() throws IOException { * out of the box by {@link org.apache.http.client.HttpClient}. * Exercises the test http server ability to send back whatever body it received. */ - public void testDeleteWithBody() throws IOException { + public void testDeleteWithBody() throws Exception { bodyTest("DELETE"); } @@ -255,57 +255,57 @@ public void testDeleteWithBody() throws IOException { * out of the box by {@link org.apache.http.client.HttpClient}. * Exercises the test http server ability to send back whatever body it received. */ - public void testGetWithBody() throws IOException { + public void testGetWithBody() throws Exception { bodyTest("GET"); } - public void testEncodeParams() throws IOException { + public void testEncodeParams() throws Exception { { Request request = new Request("PUT", "/200"); request.addParameter("routing", "this/is/the/routing"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(pathPrefix + "/200?routing=this%2Fis%2Fthe%2Frouting", response.getRequestLine().getUri()); } { Request request = new Request("PUT", "/200"); request.addParameter("routing", "this|is|the|routing"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(pathPrefix + "/200?routing=this%7Cis%7Cthe%7Crouting", response.getRequestLine().getUri()); } { Request request = new Request("PUT", "/200"); request.addParameter("routing", "routing#1"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(pathPrefix + "/200?routing=routing%231", response.getRequestLine().getUri()); } { Request request = new Request("PUT", "/200"); request.addParameter("routing", "中文"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(pathPrefix + "/200?routing=%E4%B8%AD%E6%96%87", response.getRequestLine().getUri()); } { Request request = new Request("PUT", "/200"); request.addParameter("routing", "foo bar"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(pathPrefix + "/200?routing=foo+bar", response.getRequestLine().getUri()); } { Request request = new Request("PUT", "/200"); request.addParameter("routing", "foo+bar"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(pathPrefix + "/200?routing=foo%2Bbar", response.getRequestLine().getUri()); } { Request request = new Request("PUT", "/200"); request.addParameter("routing", "foo/bar"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(pathPrefix + "/200?routing=foo%2Fbar", response.getRequestLine().getUri()); } { Request request = new Request("PUT", "/200"); request.addParameter("routing", "foo^bar"); - Response response = restClient.performRequest(request); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); assertEquals(pathPrefix + "/200?routing=foo%5Ebar", response.getRequestLine().getUri()); } } @@ -313,7 +313,7 @@ public void testEncodeParams() throws IOException { /** * Verify that credentials are sent on the first request with preemptive auth enabled (default when provided with credentials). */ - public void testPreemptiveAuthEnabled() throws IOException { + public void testPreemptiveAuthEnabled() throws Exception { final String[] methods = {"POST", "PUT", "GET", "DELETE"}; try (RestClient restClient = createRestClient(true, true)) { @@ -328,7 +328,7 @@ public void testPreemptiveAuthEnabled() throws IOException { /** * Verify that credentials are not sent on the first request with preemptive auth disabled. */ - public void testPreemptiveAuthDisabled() throws IOException { + public void testPreemptiveAuthDisabled() throws Exception { final String[] methods = {"POST", "PUT", "GET", "DELETE"}; try (RestClient restClient = createRestClient(true, false)) { @@ -343,7 +343,7 @@ public void testPreemptiveAuthDisabled() throws IOException { /** * Verify that credentials continue to be sent even if a 401 (Unauthorized) response is received */ - public void testAuthCredentialsAreNotClearedOnAuthChallenge() throws IOException { + public void testAuthCredentialsAreNotClearedOnAuthChallenge() throws Exception { final String[] methods = {"POST", "PUT", "GET", "DELETE"}; try (RestClient restClient = createRestClient(true, true)) { @@ -362,14 +362,14 @@ public void testAuthCredentialsAreNotClearedOnAuthChallenge() throws IOException public void testUrlWithoutLeadingSlash() throws Exception { if (pathPrefix.length() == 0) { try { - restClient.performRequest(new Request("GET", "200")); + RestClientSingleHostTests.performRequestSyncOrAsync(restClient, new Request("GET", "200")); fail("request should have failed"); } catch (ResponseException e) { assertEquals(404, e.getResponse().getStatusLine().getStatusCode()); } } else { { - Response response = restClient.performRequest(new Request("GET", "200")); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, new Request("GET", "200")); //a trailing slash gets automatically added if a pathPrefix is configured assertEquals(200, response.getStatusLine().getStatusCode()); } @@ -378,7 +378,7 @@ public void testUrlWithoutLeadingSlash() throws Exception { try (RestClient restClient = RestClient.builder( new HttpHost(httpServer.getAddress().getHostString(), httpServer.getAddress().getPort())) .setPathPrefix(pathPrefix.substring(1)).build()) { - Response response = restClient.performRequest(new Request("GET", "200")); + Response response = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, new Request("GET", "200")); //a trailing slash gets automatically added if a pathPrefix is configured assertEquals(200, response.getStatusLine().getStatusCode()); } @@ -386,16 +386,16 @@ public void testUrlWithoutLeadingSlash() throws Exception { } } - private Response bodyTest(final String method) throws IOException { + private Response bodyTest(final String method) throws Exception { return bodyTest(restClient, method); } - private Response bodyTest(final RestClient restClient, final String method) throws IOException { + private Response bodyTest(final RestClient restClient, final String method) throws Exception { int statusCode = randomStatusCode(getRandom()); return bodyTest(restClient, method, statusCode, new Header[0]); } - private Response bodyTest(RestClient restClient, String method, int statusCode, Header[] headers) throws IOException { + private Response bodyTest(RestClient restClient, String method, int statusCode, Header[] headers) throws Exception { String requestBody = "{ \"field\": \"value\" }"; Request request = new Request(method, "/" + statusCode); request.setJsonEntity(requestBody); @@ -406,7 +406,7 @@ private Response bodyTest(RestClient restClient, String method, int statusCode, request.setOptions(options); Response esResponse; try { - esResponse = restClient.performRequest(request); + esResponse = RestClientSingleHostTests.performRequestSyncOrAsync(restClient, request); } catch(ResponseException e) { esResponse = e.getResponse(); } diff --git a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java index 7a3a724a1e8d9..625a4612f330a 100644 --- a/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java +++ b/client/rest/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java @@ -43,7 +43,6 @@ import org.apache.http.conn.ConnectTimeoutException; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; -import org.apache.http.impl.auth.BasicScheme; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.message.BasicHttpResponse; import org.apache.http.message.BasicStatusLine; @@ -69,9 +68,11 @@ import java.util.List; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; import static java.util.Collections.singletonList; import static org.elasticsearch.client.RestClientTestUtil.getAllErrorStatusCodes; @@ -84,7 +85,6 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -111,65 +111,84 @@ public class RestClientSingleHostTests extends RestClientTestCase { private boolean strictDeprecationMode; @Before - @SuppressWarnings("unchecked") public void createRestClient() { - httpClient = mock(CloseableHttpAsyncClient.class); + httpClient = mockHttpClient(exec); + defaultHeaders = RestClientTestUtil.randomHeaders(getRandom(), "Header-default"); + node = new Node(new HttpHost("localhost", 9200)); + failureListener = new HostsTrackingFailureListener(); + strictDeprecationMode = randomBoolean(); + restClient = new RestClient(this.httpClient, defaultHeaders, + singletonList(node), null, failureListener, NodeSelector.ANY, strictDeprecationMode); + } + + @SuppressWarnings("unchecked") + static CloseableHttpAsyncClient mockHttpClient(final ExecutorService exec) { + CloseableHttpAsyncClient httpClient = mock(CloseableHttpAsyncClient.class); when(httpClient.execute(any(HttpAsyncRequestProducer.class), any(HttpAsyncResponseConsumer.class), - any(HttpClientContext.class), any(FutureCallback.class))).thenAnswer(new Answer>() { + any(HttpClientContext.class), any(FutureCallback.class))).thenAnswer(new Answer>() { + @Override + public Future answer(InvocationOnMock invocationOnMock) throws Throwable { + final HttpAsyncRequestProducer requestProducer = (HttpAsyncRequestProducer) invocationOnMock.getArguments()[0]; + final FutureCallback futureCallback = + (FutureCallback) invocationOnMock.getArguments()[3]; + // Call the callback asynchronous to better simulate how async http client works + return exec.submit(new Callable() { @Override - public Future answer(InvocationOnMock invocationOnMock) throws Throwable { - HttpAsyncRequestProducer requestProducer = (HttpAsyncRequestProducer) invocationOnMock.getArguments()[0]; - HttpClientContext context = (HttpClientContext) invocationOnMock.getArguments()[2]; - assertThat(context.getAuthCache().get(node.getHost()), instanceOf(BasicScheme.class)); - HttpUriRequest request = (HttpUriRequest)requestProducer.generateRequest(); - //return the desired status code or exception depending on the path - if (request.getURI().getPath().equals("/soe")) { - throw new SocketTimeoutException(); - } else if (request.getURI().getPath().equals("/coe")) { - throw new ConnectTimeoutException(); - } else if (request.getURI().getPath().equals("/ioe")) { - throw new IOException(); - } else if (request.getURI().getPath().equals("/closed")) { - throw new ConnectionClosedException(); - } else if (request.getURI().getPath().equals("/handshake")) { - throw new SSLHandshakeException(""); - } else if (request.getURI().getPath().equals("/uri")) { - throw new URISyntaxException("", ""); - } else if (request.getURI().getPath().equals("/runtime")) { - throw new RuntimeException(); - } else { - int statusCode = Integer.parseInt(request.getURI().getPath().substring(1)); - StatusLine statusLine = new BasicStatusLine(new ProtocolVersion("http", 1, 1), statusCode, ""); - - final HttpResponse httpResponse = new BasicHttpResponse(statusLine); - //return the same body that was sent - if (request instanceof HttpEntityEnclosingRequest) { - HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity(); - if (entity != null) { - assertTrue("the entity is not repeatable, cannot set it to the response directly", - entity.isRepeatable()); - httpResponse.setEntity(entity); - } + public HttpResponse call() throws Exception { + if (futureCallback != null) { + try { + HttpResponse httpResponse = responseOrException(requestProducer); + futureCallback.completed(httpResponse); + } catch(Exception e) { + futureCallback.failed(e); } - //return the same headers that were sent - httpResponse.setHeaders(request.getAllHeaders()); - // Call the callback asynchronous to better simulate how async http client works - return exec.submit(new Callable() { - @Override - public HttpResponse call() { - return httpResponse; - } - }); + return null; } + return responseOrException(requestProducer); } }); + } + }); + return httpClient; + } - defaultHeaders = RestClientTestUtil.randomHeaders(getRandom(), "Header-default"); - node = new Node(new HttpHost("localhost", 9200)); - failureListener = new HostsTrackingFailureListener(); - strictDeprecationMode = randomBoolean(); - restClient = new RestClient(httpClient, defaultHeaders, - singletonList(node), null, failureListener, NodeSelector.ANY, strictDeprecationMode); + private static HttpResponse responseOrException(HttpAsyncRequestProducer requestProducer) throws Exception { + final HttpUriRequest request = (HttpUriRequest)requestProducer.generateRequest(); + final HttpHost httpHost = requestProducer.getTarget(); + //return the desired status code or exception depending on the path + switch (request.getURI().getPath()) { + case "/soe": + throw new SocketTimeoutException(httpHost.toString()); + case "/coe": + throw new ConnectTimeoutException(httpHost.toString()); + case "/ioe": + throw new IOException(httpHost.toString()); + case "/closed": + throw new ConnectionClosedException(); + case "/handshake": + throw new SSLHandshakeException(""); + case "/uri": + throw new URISyntaxException("", ""); + case "/runtime": + throw new RuntimeException(); + default: + int statusCode = Integer.parseInt(request.getURI().getPath().substring(1)); + StatusLine statusLine = new BasicStatusLine(new ProtocolVersion("http", 1, 1), statusCode, ""); + + final HttpResponse httpResponse = new BasicHttpResponse(statusLine); + //return the same body that was sent + if (request instanceof HttpEntityEnclosingRequest) { + HttpEntity entity = ((HttpEntityEnclosingRequest) request).getEntity(); + if (entity != null) { + assertTrue("the entity is not repeatable, cannot set it to the response directly", + entity.isRepeatable()); + httpResponse.setEntity(entity); + } + } + //return the same headers that were sent + httpResponse.setHeaders(request.getAllHeaders()); + return httpResponse; + } } /** @@ -208,10 +227,10 @@ public void testInternalHttpRequest() throws Exception { /** * End to end test for ok status codes */ - public void testOkStatusCodes() throws IOException { + public void testOkStatusCodes() throws Exception { for (String method : getHttpMethods()) { for (int okStatusCode : getOkStatusCodes()) { - Response response = restClient.performRequest(new Request(method, "/" + okStatusCode)); + Response response = performRequestSyncOrAsync(restClient, new Request(method, "/" + okStatusCode)); assertThat(response.getStatusLine().getStatusCode(), equalTo(okStatusCode)); } } @@ -221,7 +240,7 @@ public void testOkStatusCodes() throws IOException { /** * End to end test for error status codes: they should cause an exception to be thrown, apart from 404 with HEAD requests */ - public void testErrorStatusCodes() throws IOException { + public void testErrorStatusCodes() throws Exception { for (String method : getHttpMethods()) { Set expectedIgnores = new HashSet<>(); String ignoreParam = ""; @@ -269,7 +288,7 @@ public void testErrorStatusCodes() throws IOException { } } - public void testExceptions() throws IOException { + public void testPerformRequestIOExceptions() throws Exception { for (String method : getHttpMethods()) { //IOExceptions should be let bubble up try { @@ -283,8 +302,7 @@ public void testExceptions() throws IOException { try { restClient.performRequest(new Request(method, "/coe")); fail("request should have failed"); - } catch(IOException e) { - assertThat(e, instanceOf(ConnectTimeoutException.class)); + } catch(ConnectTimeoutException e) { // And we do all that so the thrown exception has our method in the stacktrace assertExceptionStackContainsCallingMethod(e); } @@ -292,8 +310,7 @@ public void testExceptions() throws IOException { try { restClient.performRequest(new Request(method, "/soe")); fail("request should have failed"); - } catch(IOException e) { - assertThat(e, instanceOf(SocketTimeoutException.class)); + } catch(SocketTimeoutException e) { // And we do all that so the thrown exception has our method in the stacktrace assertExceptionStackContainsCallingMethod(e); } @@ -301,8 +318,7 @@ public void testExceptions() throws IOException { try { restClient.performRequest(new Request(method, "/closed")); fail("request should have failed"); - } catch(IOException e) { - assertThat(e, instanceOf(ConnectionClosedException.class)); + } catch(ConnectionClosedException e) { // And we do all that so the thrown exception has our method in the stacktrace assertExceptionStackContainsCallingMethod(e); } @@ -310,26 +326,34 @@ public void testExceptions() throws IOException { try { restClient.performRequest(new Request(method, "/handshake")); fail("request should have failed"); - } catch(IOException e) { - assertThat(e, instanceOf(SSLHandshakeException.class)); + } catch(SSLHandshakeException e) { // And we do all that so the thrown exception has our method in the stacktrace assertExceptionStackContainsCallingMethod(e); } failureListener.assertCalled(singletonList(node)); + } + } + + public void testPerformRequestRuntimeExceptions() throws Exception { + for (String method : getHttpMethods()) { try { - restClient.performRequest(new Request(method, "/uri")); + restClient.performRequest(new Request(method, "/runtime")); fail("request should have failed"); } catch (RuntimeException e) { - assertThat(e.getCause(), instanceOf(URISyntaxException.class)); // And we do all that so the thrown exception has our method in the stacktrace assertExceptionStackContainsCallingMethod(e); } failureListener.assertCalled(singletonList(node)); + } + } + + public void testPerformRequestExceptions() throws Exception { + for (String method : getHttpMethods()) { try { - restClient.performRequest(new Request(method, "/runtime")); + restClient.performRequest(new Request(method, "/uri")); fail("request should have failed"); } catch (RuntimeException e) { - assertNull(e.getCause()); + assertThat(e.getCause(), instanceOf(URISyntaxException.class)); // And we do all that so the thrown exception has our method in the stacktrace assertExceptionStackContainsCallingMethod(e); } @@ -341,7 +365,7 @@ public void testExceptions() throws IOException { * End to end test for request and response body. Exercises the mock http client ability to send back * whatever body it has received. */ - public void testBody() throws IOException { + public void testBody() throws Exception { String body = "{ \"field\": \"value\" }"; StringEntity entity = new StringEntity(body, ContentType.APPLICATION_JSON); for (String method : Arrays.asList("DELETE", "GET", "PATCH", "POST", "PUT")) { @@ -370,7 +394,7 @@ public void testBody() throws IOException { Request request = new Request(method, "/" + randomStatusCode(getRandom())); request.setEntity(entity); try { - restClient.performRequest(request); + performRequestSyncOrAsync(restClient, request); fail("request should have failed"); } catch(UnsupportedOperationException e) { assertThat(e.getMessage(), equalTo(method + " with body is not supported")); @@ -382,7 +406,7 @@ public void testBody() throws IOException { * End to end test for request and response headers. Exercises the mock http client ability to send back * whatever headers it has received. */ - public void testHeaders() throws IOException { + public void testHeaders() throws Exception { for (String method : getHttpMethods()) { final Header[] requestHeaders = RestClientTestUtil.randomHeaders(getRandom(), "Header"); final int statusCode = randomStatusCode(getRandom()); @@ -394,7 +418,7 @@ public void testHeaders() throws IOException { request.setOptions(options); Response esResponse; try { - esResponse = restClient.performRequest(request); + esResponse = performRequestSyncOrAsync(restClient, request); } catch(ResponseException e) { esResponse = e.getResponse(); } @@ -404,7 +428,7 @@ public void testHeaders() throws IOException { } } - public void testDeprecationWarnings() throws IOException { + public void testDeprecationWarnings() throws Exception { String chars = randomAsciiAlphanumOfLength(5); assertDeprecationWarnings(singletonList("poorly formatted " + chars), singletonList("poorly formatted " + chars)); assertDeprecationWarnings(singletonList(formatWarning(chars)), singletonList(chars)); @@ -458,7 +482,7 @@ public boolean warningsShouldFailRequest(List warnings) { protected abstract WarningsHandler warningsHandler(); } - private void assertDeprecationWarnings(List warningHeaderTexts, List warningBodyTexts) throws IOException { + private void assertDeprecationWarnings(List warningHeaderTexts, List warningBodyTexts) throws Exception { String method = randomFrom(getHttpMethods()); Request request = new Request(method, "/200"); RequestOptions.Builder options = request.getOptions().toBuilder(); @@ -481,7 +505,7 @@ private void assertDeprecationWarnings(List warningHeaderTexts, List warningHeaderTexts, List exceptionRef = new AtomicReference<>(); + final AtomicReference responseRef = new AtomicReference<>(); + final CountDownLatch latch = new CountDownLatch(1); + restClient.performRequestAsync(request, new ResponseListener() { + @Override + public void onSuccess(Response response) { + responseRef.set(response); + latch.countDown(); + + } + + @Override + public void onFailure(Exception exception) { + exceptionRef.set(exception); + latch.countDown(); + } + }); + latch.await(); + if (exceptionRef.get() != null) { + throw exceptionRef.get(); + } + return responseRef.get(); + } + } + /** * Asserts that the provided {@linkplain Exception} contains the method * that called this somewhere on its stack. This is From 5812ecd91d62e6ac934da79aae2450eac57ccfe9 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Tue, 5 Feb 2019 14:15:32 +0100 Subject: [PATCH 11/13] add wrapping for interrupted exception --- .../src/main/java/org/elasticsearch/client/RestClient.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java index 5e63cdd0776ab..e211de72af008 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java +++ b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java @@ -765,10 +765,13 @@ private static class ResponseOrResponseException { } /** - * Wrap whatever exception we received, copying the type where possible so the synchronous API looks as much as possible - * like the asynchronous API. We wrap the exception so that the caller's signature shows up in any exception we throw. + * Wrap whatever exception we may receive, copying the type where possible so the synchronous API looks as much as possible + * like the asynchronous API. We wrap the exception so that the caller's signature shows up in any exception we re-throw. */ private static Exception extractAndWrapCause(Exception exception) { + if (exception instanceof InterruptedException) { + throw new RuntimeException("thread waiting for the response was interrupted", exception); + } if (exception instanceof ExecutionException) { ExecutionException executionException = (ExecutionException)exception; Throwable t = executionException.getCause() == null ? executionException : executionException.getCause(); From 2a9e0d1a97ee55948f32b25beb4ca1dba9637e40 Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Tue, 5 Feb 2019 15:22:39 +0100 Subject: [PATCH 12/13] review --- .../src/main/java/org/elasticsearch/client/RestClient.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java index e211de72af008..f0478742ecc82 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java +++ b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java @@ -233,7 +233,7 @@ private Response performRequest(final NodeTuple> nodeTuple, if (cause instanceof RuntimeException) { throw (RuntimeException) cause; } - throw new IllegalStateException("cause must be either RuntimeException or IOException", cause); + throw new IllegalStateException("unexpected exception type: must be either RuntimeException or IOException", cause); } ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse); if (responseOrResponseException.responseException == null) { @@ -765,8 +765,8 @@ private static class ResponseOrResponseException { } /** - * Wrap whatever exception we may receive, copying the type where possible so the synchronous API looks as much as possible - * like the asynchronous API. We wrap the exception so that the caller's signature shows up in any exception we re-throw. + * Wrap the exception so the caller's signature shows up in the stack trace, taking care to copy the original type and message + * where possible so async and sync code don't have to check different exceptions. */ private static Exception extractAndWrapCause(Exception exception) { if (exception instanceof InterruptedException) { From ec0b0559704e328052baa282fe77b9781ba1822c Mon Sep 17 00:00:00 2001 From: Luca Cavanna Date: Tue, 5 Feb 2019 20:18:10 +0100 Subject: [PATCH 13/13] fix failing test --- .../indexlifecycle/TimeSeriesLifecycleActionsIT.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeSeriesLifecycleActionsIT.java b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeSeriesLifecycleActionsIT.java index 24c1ab1c1cbf1..8f07b532769c3 100644 --- a/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeSeriesLifecycleActionsIT.java +++ b/x-pack/plugin/ilm/qa/multi-node/src/test/java/org/elasticsearch/xpack/indexlifecycle/TimeSeriesLifecycleActionsIT.java @@ -45,7 +45,6 @@ import java.io.IOException; import java.io.InputStream; -import java.io.UnsupportedEncodingException; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -670,16 +669,16 @@ public void testNonexistentPolicy() throws Exception { }); } - public void testInvalidPolicyNames() throws UnsupportedEncodingException { + public void testInvalidPolicyNames() { ResponseException ex; policy = randomAlphaOfLengthBetween(0,10) + "," + randomAlphaOfLengthBetween(0,10); ex = expectThrows(ResponseException.class, () -> createNewSingletonPolicy("delete", new DeleteAction())); - assertThat(ex.getCause().getMessage(), containsString("invalid policy name")); + assertThat(ex.getMessage(), containsString("invalid policy name")); policy = randomAlphaOfLengthBetween(0,10) + "%20" + randomAlphaOfLengthBetween(0,10); ex = expectThrows(ResponseException.class, () -> createNewSingletonPolicy("delete", new DeleteAction())); - assertThat(ex.getCause().getMessage(), containsString("invalid policy name")); + assertThat(ex.getMessage(), containsString("invalid policy name")); policy = "_" + randomAlphaOfLengthBetween(1, 20); ex = expectThrows(ResponseException.class, () -> createNewSingletonPolicy("delete", new DeleteAction())); @@ -716,7 +715,7 @@ public void testDeletePolicyInUse() throws IOException { Request deleteRequest = new Request("DELETE", "_ilm/policy/" + originalPolicy); ResponseException ex = expectThrows(ResponseException.class, () -> client().performRequest(deleteRequest)); - assertThat(ex.getCause().getMessage(), + assertThat(ex.getMessage(), Matchers.allOf( containsString("Cannot delete policy [" + originalPolicy + "]. It is in use by one or more indices: ["), containsString(managedIndex1),