diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java index a742fcb0f1a0f..d511aab047bfe 100644 --- a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClient.java @@ -8,6 +8,7 @@ package org.elasticsearch.client; +import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -67,17 +68,17 @@ import org.elasticsearch.client.core.TermVectorsRequest; import org.elasticsearch.client.core.TermVectorsResponse; import org.elasticsearch.client.tasks.TaskSubmissionResponse; -import org.elasticsearch.common.util.concurrent.FutureUtils; -import org.elasticsearch.core.CheckedConsumer; -import org.elasticsearch.core.CheckedFunction; -import org.elasticsearch.common.xcontent.ParseField; import org.elasticsearch.common.Strings; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.util.concurrent.ListenableFuture; import org.elasticsearch.common.xcontent.ContextParser; import org.elasticsearch.common.xcontent.DeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.common.xcontent.ParseField; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; +import org.elasticsearch.core.CheckedConsumer; +import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.index.rankeval.RankEvalRequest; import org.elasticsearch.index.rankeval.RankEvalResponse; import org.elasticsearch.index.reindex.BulkByScrollResponse; @@ -130,18 +131,18 @@ import org.elasticsearch.search.aggregations.bucket.range.RangeAggregationBuilder; import org.elasticsearch.search.aggregations.bucket.sampler.InternalSampler; import org.elasticsearch.search.aggregations.bucket.sampler.ParsedSampler; +import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms; import org.elasticsearch.search.aggregations.bucket.terms.LongRareTerms; +import org.elasticsearch.search.aggregations.bucket.terms.LongTerms; +import org.elasticsearch.search.aggregations.bucket.terms.ParsedDoubleTerms; import org.elasticsearch.search.aggregations.bucket.terms.ParsedLongRareTerms; +import org.elasticsearch.search.aggregations.bucket.terms.ParsedLongTerms; import org.elasticsearch.search.aggregations.bucket.terms.ParsedSignificantLongTerms; import org.elasticsearch.search.aggregations.bucket.terms.ParsedSignificantStringTerms; import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringRareTerms; +import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms; import org.elasticsearch.search.aggregations.bucket.terms.SignificantLongTerms; import org.elasticsearch.search.aggregations.bucket.terms.SignificantStringTerms; -import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms; -import org.elasticsearch.search.aggregations.bucket.terms.LongTerms; -import org.elasticsearch.search.aggregations.bucket.terms.ParsedDoubleTerms; -import org.elasticsearch.search.aggregations.bucket.terms.ParsedLongTerms; -import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms; import org.elasticsearch.search.aggregations.bucket.terms.StringRareTerms; import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder; @@ -253,11 +254,16 @@ public class RestHighLevelClient implements Closeable { private static final Logger logger = LogManager.getLogger(RestHighLevelClient.class); + /** + * Environment variable determining whether to send the 7.x compatibility header + */ + public static final String API_VERSIONING_ENV_VARIABLE = "ELASTIC_CLIENT_APIVERSIONING"; // To be called using performClientRequest and performClientRequestAsync to ensure version compatibility check private final RestClient client; private final NamedXContentRegistry registry; private final CheckedConsumer doClose; + private final boolean useAPICompatibility; /** Do not access directly but through getVersionValidationFuture() */ private volatile ListenableFuture> versionValidationFuture; @@ -310,11 +316,28 @@ protected RestHighLevelClient(RestClientBuilder restClientBuilder, List doClose, List namedXContentEntries) { + this(restClient, doClose, namedXContentEntries, null); + } + + /** + * Creates a {@link RestHighLevelClient} given the low level {@link RestClient} that it should use to perform requests and + * a list of entries that allow to parse custom response sections added to Elasticsearch through plugins. + * This constructor can be called by subclasses in case an externally created low-level REST client needs to be provided. + * The consumer argument allows to control what needs to be done when the {@link #close()} method is called. + * Also subclasses can provide parsers for custom response sections added to Elasticsearch through plugins. + */ + protected RestHighLevelClient(RestClient restClient, CheckedConsumer doClose, + List namedXContentEntries, Boolean useAPICompatibility) { this.client = Objects.requireNonNull(restClient, "restClient must not be null"); this.doClose = Objects.requireNonNull(doClose, "doClose consumer must not be null"); this.registry = new NamedXContentRegistry( - Stream.of(getDefaultNamedXContents().stream(), getProvidedNamedXContents().stream(), namedXContentEntries.stream()) - .flatMap(Function.identity()).collect(toList())); + Stream.of(getDefaultNamedXContents().stream(), getProvidedNamedXContents().stream(), namedXContentEntries.stream()) + .flatMap(Function.identity()).collect(toList())); + if (useAPICompatibility == null && "true".equals(System.getenv(API_VERSIONING_ENV_VARIABLE))) { + this.useAPICompatibility = true; + } else { + this.useAPICompatibility = Boolean.TRUE.equals(useAPICompatibility); + } } /** @@ -2016,7 +2039,82 @@ protected static boolean convertExistsResponse(Response response) { return response.getStatusLine().getStatusCode() == 200; } + private enum EntityType { + JSON() { + @Override + public String header() { + return "application/json"; + } + @Override + public String compatibleHeader() { + return "application/vnd.elasticsearch+json; compatible-with=7"; + } + }, + NDJSON() { + @Override + public String header() { + return "application/x-ndjson"; + } + @Override + public String compatibleHeader() { + return "application/vnd.elasticsearch+x-ndjson; compatible-with=7"; + } + }, + STAR() { + @Override + public String header() { + return "application/*"; + } + @Override + public String compatibleHeader() { + return "application/vnd.elasticsearch+json; compatible-with=7"; + } + }, + YAML() { + @Override + public String header() { + return "application/yaml"; + } + @Override + public String compatibleHeader() { + return "application/vnd.elasticsearch+yaml; compatible-with=7"; + } + }, + SMILE() { + @Override + public String header() { + return "application/smile"; + } + @Override + public String compatibleHeader() { + return "application/vnd.elasticsearch+smile; compatible-with=7"; + } + }, + CBOR() { + @Override + public String header() { + return "application/cbor"; + } + @Override + public String compatibleHeader() { + return "application/vnd.elasticsearch+cbor; compatible-with=7"; + } + }; + + public abstract String header(); + public abstract String compatibleHeader(); + + @Override + public String toString() { + return header(); + } + } + private Cancellable performClientRequestAsync(Request request, ResponseListener listener) { + // Add compatibility request headers if compatibility mode has been enabled + if (this.useAPICompatibility) { + modifyRequestForCompatibility(request); + } ListenableFuture> versionCheck = getVersionValidationFuture(); @@ -2068,7 +2166,71 @@ public void onFailure(Exception e) { return result; }; + + /** + * Go through all the request's existing headers, looking for {@code headerName} headers and if they exist, + * changing them to use version compatibility. If no request headers are changed, modify the entity type header if appropriate + */ + boolean addCompatibilityFor(RequestOptions.Builder newOptions, Header entityHeader, String headerName) { + // Modify any existing "Content-Type" headers on the request to use the version compatibility, if available + boolean contentTypeModified = false; + for (Header header : new ArrayList<>(newOptions.getHeaders())) { + if (headerName.equalsIgnoreCase(header.getName()) == false) { + continue; + } + contentTypeModified = contentTypeModified || modifyHeader(newOptions, header, headerName); + } + + // If there were no request-specific headers, modify the request entity's header to be compatible + if (entityHeader != null && contentTypeModified == false) { + contentTypeModified = modifyHeader(newOptions, entityHeader, headerName); + } + + return contentTypeModified; + } + + /** + * Modify the given header to be version compatible, if necessary. + * Returns true if a modification was made, false otherwise. + */ + boolean modifyHeader(RequestOptions.Builder newOptions, Header header, String headerName) { + for (EntityType type : EntityType.values()) { + final String headerValue = header.getValue(); + if (headerValue.startsWith(type.header())) { + String newHeaderValue = headerValue.replace(type.header(), type.compatibleHeader()); + newOptions.removeHeader(header.getName()); + newOptions.addHeader(headerName, newHeaderValue); + return true; + } + } + return false; + } + + /** + * Make all necessary changes to support API compatibility for the given request. This includes + * modifying the "Content-Type" and "Accept" headers if present, or modifying the header based + * on the request's entity type. + */ + void modifyRequestForCompatibility(Request request) { + final Header entityHeader = request.getEntity() == null ? null : request.getEntity().getContentType(); + final RequestOptions.Builder newOptions = request.getOptions().toBuilder(); + + addCompatibilityFor(newOptions, entityHeader, "Content-Type"); + if (request.getOptions().containsHeader("Accept")) { + addCompatibilityFor(newOptions, entityHeader, "Accept"); + } else { + // There is no entity, and no existing accept header, but we still need one + // with compatibility, so use the compatible JSON (default output) format + newOptions.addHeader("Accept", EntityType.JSON.compatibleHeader()); + } + request.setOptions(newOptions); + } + private Response performClientRequest(Request request) throws IOException { + // Add compatibility request headers if compatibility mode has been enabled + if (this.useAPICompatibility) { + modifyRequestForCompatibility(request); + } Optional versionValidation; try { diff --git a/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClientBuilder.java b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClientBuilder.java new file mode 100644 index 0000000000000..df2724f1f1982 --- /dev/null +++ b/client/rest-high-level/src/main/java/org/elasticsearch/client/RestHighLevelClientBuilder.java @@ -0,0 +1,51 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.client; + +import org.elasticsearch.common.xcontent.NamedXContentRegistry; +import org.elasticsearch.core.CheckedConsumer; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +/** + * Helper to build a {@link RestHighLevelClient}, allowing setting the low-level client that + * should be used as well as whether API compatibility should be used. + */ + +public class RestHighLevelClientBuilder { + private final RestClient restClient; + private CheckedConsumer closeHandler = RestClient::close; + private List namedXContentEntries = Collections.emptyList(); + private Boolean apiCompatibilityMode = null; + + public RestHighLevelClientBuilder(RestClient restClient) { + this.restClient = restClient; + } + + public RestHighLevelClientBuilder closeHandler(CheckedConsumer closeHandler) { + this.closeHandler = closeHandler; + return this; + } + + public RestHighLevelClientBuilder namedXContentEntries(List namedXContentEntries) { + this.namedXContentEntries = namedXContentEntries; + return this; + } + + public RestHighLevelClientBuilder setApiCompatibilityMode(Boolean enabled) { + this.apiCompatibilityMode = enabled; + return this; + } + + public RestHighLevelClient build() { + return new RestHighLevelClient(this.restClient, this.closeHandler, this.namedXContentEntries, this.apiCompatibilityMode); + } +} diff --git a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java index 45384216bd0d5..4f7511684f493 100644 --- a/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/elasticsearch/client/RestHighLevelClientTests.java @@ -10,6 +10,7 @@ import com.fasterxml.jackson.core.JsonParseException; +import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; import org.apache.http.HttpResponse; @@ -19,6 +20,8 @@ import org.apache.http.StatusLine; import org.apache.http.client.methods.HttpGet; import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; +import org.apache.http.message.BasicHeader; import org.apache.http.message.BasicHttpResponse; import org.apache.http.message.BasicRequestLine; import org.apache.http.message.BasicStatusLine; @@ -88,9 +91,7 @@ import org.elasticsearch.client.transform.transforms.TimeRetentionPolicyConfig; import org.elasticsearch.client.transform.transforms.TimeSyncConfig; import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.core.CheckedFunction; import org.elasticsearch.common.bytes.BytesReference; -import org.elasticsearch.core.Tuple; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.ToXContent; @@ -100,6 +101,8 @@ import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.cbor.CborXContent; import org.elasticsearch.common.xcontent.smile.SmileXContent; +import org.elasticsearch.core.CheckedFunction; +import org.elasticsearch.core.Tuple; import org.elasticsearch.index.rankeval.DiscountedCumulativeGain; import org.elasticsearch.index.rankeval.EvaluationMetric; import org.elasticsearch.index.rankeval.ExpectedReciprocalRank; @@ -139,6 +142,7 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -147,6 +151,7 @@ import static org.hamcrest.CoreMatchers.endsWith; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.hasItems; import static org.mockito.Matchers.any; import static org.mockito.Matchers.argThat; @@ -1226,6 +1231,87 @@ public void testCancellationForwarding() throws Exception { verify(cancellable, times(1)).cancel(); } + public void testModifyHeader() { + RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); + assertTrue(restHighLevelClient.modifyHeader(builder, + new BasicHeader("Content-Type", "application/json; Charset=UTF-16"), "Content-Type")); + + assertThat(builder.getHeaders().stream().map(h -> h.getName() + "=>" + h.getValue()).collect(Collectors.joining(",")), + containsString("Content-Type=>application/vnd.elasticsearch+json; compatible-with=7; Charset=UTF-16")); + + builder = RequestOptions.DEFAULT.toBuilder(); + assertFalse(restHighLevelClient.modifyHeader(builder, new BasicHeader("Content-Type", "other"), "Content-Type")); + + assertThat(builder.getHeaders().stream().map(h -> h.getName() + "=>" + h.getValue()).collect(Collectors.joining(",")), + equalTo("")); + } + + public void testAddCompatibilityFor() { + RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder(); + Header entityHeader = new BasicHeader("Content-Type", "application/json"); + String headerName = "Content-Type"; + + // No request headers, use entity header + assertTrue(restHighLevelClient.addCompatibilityFor(builder, entityHeader, headerName)); + assertThat(builder.getHeaders().stream().map(h -> h.getName() + "=>" + h.getValue()).collect(Collectors.joining(",")), + containsString("Content-Type=>application/vnd.elasticsearch+json; compatible-with=7")); + + // Request has a header, ignore entity header + builder = RequestOptions.DEFAULT.toBuilder().addHeader("Content-Type", "application/yaml Charset=UTF-32"); + assertTrue(restHighLevelClient.addCompatibilityFor(builder, entityHeader, headerName)); + assertThat(builder.getHeaders().stream().map(h -> h.getName() + "=>" + h.getValue()).collect(Collectors.joining(",")), + containsString("Content-Type=>application/vnd.elasticsearch+yaml; compatible-with=7 Charset=UTF-32")); + + // Request has no headers, and no entity, no changes + builder = RequestOptions.DEFAULT.toBuilder(); + assertFalse(restHighLevelClient.addCompatibilityFor(builder, null, headerName)); + assertThat(builder.getHeaders().stream().map(h -> h.getName() + "=>" + h.getValue()).collect(Collectors.joining(",")), + equalTo("")); + } + + public void testModifyForCompatibility() { + final Function allHeaders = r -> + r.getOptions().getHeaders().stream().map(h -> h.getName() + "=>" + h.getValue()).collect(Collectors.joining(",")); + + Request req = new Request("POST", "/"); + + restHighLevelClient.modifyRequestForCompatibility(req); + + assertThat(allHeaders.apply(req), containsString("")); + + // With an entity + req = new Request("POST", "/"); + req.setEntity(new StringEntity("{}", ContentType.APPLICATION_JSON)); + restHighLevelClient.modifyRequestForCompatibility(req); + + assertThat(allHeaders.apply(req), + containsString("Content-Type=>application/vnd.elasticsearch+json; compatible-with=7; charset=UTF-8," + + "Accept=>application/vnd.elasticsearch+json; compatible-with=7")); + + // With "Content-Type" headers already set + req = new Request("POST", "/"); + req.setEntity(new StringEntity("{}", ContentType.TEXT_PLAIN)); + req.setOptions(RequestOptions.DEFAULT.toBuilder().addHeader("Content-Type", "application/json; Charset=UTF-16")); + restHighLevelClient.modifyRequestForCompatibility(req); + + assertThat(allHeaders.apply(req), + containsString("Content-Type=>application/vnd.elasticsearch+json; compatible-with=7; Charset=UTF-16," + + "Accept=>application/vnd.elasticsearch+json; compatible-with=7")); + + // With "Content-Type" and "Accept" headers already set + req = new Request("POST", "/"); + req.setEntity(new StringEntity("{}", ContentType.TEXT_PLAIN)); + req.setOptions(RequestOptions.DEFAULT.toBuilder() + .addHeader("Content-Type", "application/json; Charset=UTF-16") + .addHeader("Accept", "application/yaml; Charset=UTF-32")); + restHighLevelClient.modifyRequestForCompatibility(req); + + assertThat(allHeaders.apply(req), + containsString("Content-Type=>application/vnd.elasticsearch+json; compatible-with=7; Charset=UTF-16," + + "Accept=>application/vnd.elasticsearch+yaml; compatible-with=7; Charset=UTF-32")); + + } + private static void assertSyncMethod(Method method, String apiName, List booleanReturnMethods) { //A few methods return a boolean rather than a response object if (apiName.equals("ping") || apiName.contains("exist") || booleanReturnMethods.contains(apiName)) { diff --git a/client/rest/src/main/java/org/elasticsearch/client/RequestOptions.java b/client/rest/src/main/java/org/elasticsearch/client/RequestOptions.java index c5303cb18e055..6ddd3fa557966 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/RequestOptions.java +++ b/client/rest/src/main/java/org/elasticsearch/client/RequestOptions.java @@ -71,6 +71,13 @@ public List
getHeaders() { return headers; } + /** + * Return true if the options contain the given header + */ + public boolean containsHeader(String name) { + return headers.stream().anyMatch(h -> name.equalsIgnoreCase(h.getName())); + } + public Map getParameters() { return parameters; } @@ -202,6 +209,22 @@ public Builder addHeader(String name, String value) { return this; } + /** + * Remove all headers with the given name. + */ + public Builder removeHeader(String name) { + Objects.requireNonNull(name, "header name cannot be null"); + this.headers.removeIf(h -> name.equalsIgnoreCase(h.getName())); + return this; + } + + /** + * Return all headers for the request + */ + public List
getHeaders() { + return this.headers; + } + /** * Add the provided parameter to the request. */ diff --git a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java index b684682c412b1..c6a4236e9f841 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/RestClient.java +++ b/client/rest/src/main/java/org/elasticsearch/client/RestClient.java @@ -28,9 +28,9 @@ import org.apache.http.HttpResponse; import org.apache.http.client.AuthCache; import org.apache.http.client.ClientProtocolException; +import org.apache.http.client.config.RequestConfig; import org.apache.http.client.entity.GzipCompressingEntity; import org.apache.http.client.entity.GzipDecompressingEntity; -import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.HttpEntityEnclosingRequestBase; import org.apache.http.client.methods.HttpHead; import org.apache.http.client.methods.HttpOptions; @@ -51,7 +51,6 @@ import org.apache.http.nio.protocol.HttpAsyncResponseConsumer; import org.apache.http.protocol.HTTP; -import javax.net.ssl.SSLHandshakeException; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.Closeable; @@ -81,6 +80,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import java.util.zip.GZIPOutputStream; +import javax.net.ssl.SSLHandshakeException; import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.Collections.singletonList; @@ -103,7 +103,6 @@ * Requests can be traced by enabling trace logging for "tracer". The trace logger outputs requests and responses in curl format. */ public class RestClient implements Closeable { - private static final Log logger = LogFactory.getLog(RestClient.class); private final CloseableHttpAsyncClient client; diff --git a/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java b/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java index ed3e7f392a773..27550663b0e3e 100644 --- a/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java +++ b/client/rest/src/main/java/org/elasticsearch/client/RestClientBuilder.java @@ -30,7 +30,6 @@ import org.apache.http.protocol.HttpContext; import org.apache.http.util.VersionInfo; -import javax.net.ssl.SSLContext; import java.io.IOException; import java.io.InputStream; import java.security.AccessController; @@ -40,6 +39,7 @@ import java.util.Locale; import java.util.Objects; import java.util.Properties; +import javax.net.ssl.SSLContext; /** * Helps creating a new {@link RestClient}. Allows to set the most common http client configuration options when internally