From 25b97c065555938e2a5a42ec09c4da0a74a9f1d1 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Fri, 25 Nov 2022 11:21:50 -0500 Subject: [PATCH] [FEATURE] Get rid of the dependency on OpenSearch core Signed-off-by: Andriy Redko --- .../httpclient5/ApacheHttpClient5Options.java | 266 +++++++ .../ApacheHttpClient5Transport.java | 729 ++++++++++++++++++ .../transport/httpclient5/DeadHostState.java | 125 +++ .../HttpAsyncResponseConsumerFactory.java | 86 +++ .../transport/httpclient5/Response.java | 214 +++++ .../httpclient5/ResponseException.java | 96 +++ .../httpclient5/WarningFailureException.java | 76 ++ .../httpclient5/WarningsHandler.java | 80 ++ .../HeapBufferedAsyncEntityConsumer.java | 139 ++++ .../HeapBufferedAsyncResponseConsumer.java | 123 +++ .../HttpEntityAsyncEntityProducer.java | 182 +++++ .../internal/HttpUriRequestProducer.java | 62 ++ .../transport/httpclient5/internal/Node.java | 289 +++++++ .../httpclient5/internal/NodeSelector.java | 105 +++ 14 files changed, 2572 insertions(+) create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Options.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/DeadHostState.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/HttpAsyncResponseConsumerFactory.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/Response.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/ResponseException.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/WarningFailureException.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/WarningsHandler.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncEntityConsumer.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncResponseConsumer.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HttpEntityAsyncEntityProducer.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HttpUriRequestProducer.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/Node.java create mode 100644 java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/NodeSelector.java diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Options.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Options.java new file mode 100644 index 0000000000..324017a814 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Options.java @@ -0,0 +1,266 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.httpclient5; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Map.Entry; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.message.BasicHeader; +import org.apache.hc.core5.http.nio.AsyncResponseConsumer; +import org.opensearch.client.transport.TransportOptions; +import org.opensearch.client.transport.Version; + +public class ApacheHttpClient5Options implements TransportOptions { + private static final String USER_AGENT = "User-Agent"; + + /** + * Default request options. + */ + public static final ApacheHttpClient5Options DEFAULT = new Builder( + Collections.emptyList(), + HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory.DEFAULT, + null, + null + ).build(); + + private final List
headers; + private final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory; + private final WarningsHandler warningsHandler; + private final RequestConfig requestConfig; + + private ApacheHttpClient5Options(Builder builder) { + this.headers = Collections.unmodifiableList(new ArrayList<>(builder.headers)); + this.httpAsyncResponseConsumerFactory = builder.httpAsyncResponseConsumerFactory; + this.warningsHandler = builder.warningsHandler; + this.requestConfig = builder.requestConfig; + } + + public HttpAsyncResponseConsumerFactory getHttpAsyncResponseConsumerFactory() { + return httpAsyncResponseConsumerFactory; + } + + public WarningsHandler getWarningsHandler() { + return warningsHandler; + } + + public RequestConfig getRequestConfig() { + return requestConfig; + } + + @Override + public Collection> headers() { + return headers.stream() + .map(h -> new AbstractMap.SimpleImmutableEntry<>(h.getName(), h.getValue())) + .collect(Collectors.toList()); + } + + @Override + public Map queryParameters() { + return null; + } + + @Override + public Function, Boolean> onWarnings() { + if (warningsHandler == null) { + return null; + } else { + return warnings -> warningsHandler.warningsShouldFailRequest(warnings); + } + } + + @Override + public Builder toBuilder() { + return new Builder(headers, httpAsyncResponseConsumerFactory, warningsHandler, requestConfig); + } + + public static class Builder implements TransportOptions.Builder { + private final List
headers; + private HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory; + private WarningsHandler warningsHandler; + private RequestConfig requestConfig; + + private Builder(Builder builder) { + this(builder.headers, builder.httpAsyncResponseConsumerFactory, + builder.warningsHandler, builder.requestConfig); + } + + private Builder( + List
headers, + HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory, + WarningsHandler warningsHandler, + RequestConfig requestConfig + ) { + this.headers = new ArrayList<>(headers); + this.httpAsyncResponseConsumerFactory = httpAsyncResponseConsumerFactory; + this.warningsHandler = warningsHandler; + this.requestConfig = requestConfig; + } + + /** + * Add the provided header to the request. + * + * @param name the header name + * @param value the header value + * @throws NullPointerException if {@code name} or {@code value} is null. + */ + @Override + public Builder addHeader(String name, String value) { + Objects.requireNonNull(name, "header name cannot be null"); + Objects.requireNonNull(value, "header value cannot be null"); + this.headers.add(new ReqHeader(name, value)); + return this; + } + + @Override + public TransportOptions.Builder setParameter(String name, String value) { + return this; + } + + /** + * Called if there are warnings to determine if those warnings should fail the request. + */ + @Override + public TransportOptions.Builder onWarnings(Function, Boolean> listener) { + if (listener == null) { + setWarningsHandler(null); + } else { + setWarningsHandler(w -> { + if (w != null && !w.isEmpty()) { + return listener.apply(w); + } else { + return false; + } + }); + } + + return this; + } + + /** + * Set the {@link HttpAsyncResponseConsumerFactory} used to create one + * {@link AsyncResponseConsumer} callback per retry. Controls how the + * response body gets streamed from a non-blocking HTTP connection on the + * client side. + * + * @param httpAsyncResponseConsumerFactory factory for creating {@link AsyncResponseConsumer}. + * @throws NullPointerException if {@code httpAsyncResponseConsumerFactory} is null. + */ + public void setHttpAsyncResponseConsumerFactory(HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory) { + this.httpAsyncResponseConsumerFactory = Objects.requireNonNull( + httpAsyncResponseConsumerFactory, + "httpAsyncResponseConsumerFactory cannot be null" + ); + } + + /** + * How this request should handle warnings. If null (the default) then + * this request will default to the behavior dictacted by + * `setStrictDeprecationMode`. + *

+ * This can be set to {@link WarningsHandler#PERMISSIVE} if the client + * should ignore all warnings which is the same behavior as setting + * strictDeprecationMode to true. It can be set to + * {@link WarningsHandler#STRICT} if the client should fail if there are + * any warnings which is the same behavior as settings + * strictDeprecationMode to false. + *

+ * It can also be set to a custom implementation of + * {@linkplain WarningsHandler} to permit only certain warnings or to + * fail the request if the warnings returned don't + * exactly match some set. + * + * @param warningsHandler the {@link WarningsHandler} to be used + */ + public void setWarningsHandler(WarningsHandler warningsHandler) { + this.warningsHandler = warningsHandler; + } + + /** + * set RequestConfig, which can set socketTimeout, connectTimeout + * and so on by request + * @param requestConfig http client RequestConfig + * @return Builder + */ + public Builder setRequestConfig(RequestConfig requestConfig) { + this.requestConfig = requestConfig; + return this; + } + + @Override + public ApacheHttpClient5Options build() { + return new ApacheHttpClient5Options(this); + } + } + + static ApacheHttpClient5Options initialOptions() { + String ua = String.format( + Locale.ROOT, + "opensearch-java/%s (Java/%s)", + Version.VERSION == null ? "Unknown" : Version.VERSION.toString(), + System.getProperty("java.version") + ); + + return new ApacheHttpClient5Options( + DEFAULT.toBuilder() + .addHeader(USER_AGENT, ua) + .addHeader("Accept", ApacheHttpClient5Transport.JsonContentType.toString()) + ); + } + + static ApacheHttpClient5Options of(TransportOptions options) { + if (options instanceof ApacheHttpClient5Options) { + return (ApacheHttpClient5Options)options; + + } else { + final Builder builder = new Builder(DEFAULT.toBuilder()); + options.headers().forEach(h -> builder.addHeader(h.getKey(), h.getValue())); + options.queryParameters().forEach(builder::setParameter); + builder.onWarnings(options.onWarnings()); + return builder.build(); + } + } + + /** + * Custom implementation of {@link BasicHeader} that overrides equals and + * hashCode so it is easier to test equality of {@link ApacheHttpClient5Options}. + */ + static final class ReqHeader extends BasicHeader { + ReqHeader(String name, String value) { + super(name, value); + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other instanceof ReqHeader) { + Header otherHeader = (Header) other; + return Objects.equals(getName(), otherHeader.getName()) && Objects.equals(getValue(), otherHeader.getValue()); + } + return false; + } + + @Override + public int hashCode() { + return Objects.hash(getName(), getValue()); + } + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java new file mode 100644 index 0000000000..974882cd31 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ApacheHttpClient5Transport.java @@ -0,0 +1,729 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.httpclient5; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import javax.annotation.Nullable; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hc.client5.http.auth.AuthCache; +import org.apache.hc.client5.http.auth.AuthScheme; +import org.apache.hc.client5.http.auth.AuthScope; +import org.apache.hc.client5.http.auth.Credentials; +import org.apache.hc.client5.http.auth.CredentialsProvider; +import org.apache.hc.client5.http.classic.methods.HttpHead; +import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase; +import org.apache.hc.client5.http.entity.GzipDecompressingEntity; +import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient; +import org.apache.hc.client5.http.impl.auth.BasicAuthCache; +import org.apache.hc.client5.http.impl.auth.BasicScheme; +import org.apache.hc.client5.http.protocol.HttpClientContext; +import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.io.entity.BufferedHttpEntity; +import org.apache.hc.core5.http.io.entity.ByteArrayEntity; +import org.apache.hc.core5.http.io.entity.EntityUtils; +import org.apache.hc.core5.http.message.RequestLine; +import org.apache.hc.core5.http.nio.AsyncRequestProducer; +import org.apache.hc.core5.http.nio.AsyncResponseConsumer; +import org.opensearch.client.json.JsonpDeserializer; +import org.opensearch.client.json.JsonpMapper; +import org.opensearch.client.json.NdJsonpSerializable; +import org.opensearch.client.opensearch._types.ErrorResponse; +import org.opensearch.client.opensearch._types.OpenSearchException; +import org.opensearch.client.transport.Endpoint; +import org.opensearch.client.transport.JsonEndpoint; +import org.opensearch.client.transport.OpenSearchTransport; +import org.opensearch.client.transport.TransportException; +import org.opensearch.client.transport.TransportOptions; +import org.opensearch.client.transport.endpoints.BooleanEndpoint; +import org.opensearch.client.transport.endpoints.BooleanResponse; +import org.opensearch.client.transport.httpclient5.internal.HttpUriRequestProducer; +import org.opensearch.client.transport.httpclient5.internal.Node; +import org.opensearch.client.transport.httpclient5.internal.NodeSelector; +import org.opensearch.client.util.MissingRequiredPropertyException; + +import jakarta.json.stream.JsonGenerator; +import jakarta.json.stream.JsonParser; + +public class ApacheHttpClient5Transport implements OpenSearchTransport { + private static final Log logger = LogFactory.getLog(ApacheHttpClient5Transport.class); + static final ContentType JsonContentType = ContentType.APPLICATION_JSON; + + private final JsonpMapper mapper; + private final CloseableHttpAsyncClient client; + private final ApacheHttpClient5Options transportOptions; + private final ConcurrentMap denylist = new ConcurrentHashMap<>(); + private final AtomicInteger lastNodeIndex = new AtomicInteger(0); + private volatile NodeTuple> nodeTuple; + private final NodeSelector nodeSelector = NodeSelector.ANY; + private final WarningsHandler warningsHandler; + + public ApacheHttpClient5Transport(final CloseableHttpAsyncClient client, final List nodes, final JsonpMapper mapper, + @Nullable TransportOptions options, boolean strictDeprecationMode) { + this.mapper = mapper; + this.client = client; + this.transportOptions = (options == null) ? ApacheHttpClient5Options.initialOptions() : ApacheHttpClient5Options.of(options); + this.warningsHandler = strictDeprecationMode ? WarningsHandler.STRICT : WarningsHandler.PERMISSIVE; + setNodes(nodes); + } + + public ApacheHttpClient5Transport(final CloseableHttpAsyncClient client, final List nodes, + final JsonpMapper mapper, final boolean strictDeprecationMode) { + this(client, nodes, mapper, null, strictDeprecationMode); + } + + @Override + public ResponseT performRequest(RequestT request, + Endpoint endpoint, TransportOptions options) throws IOException { + return performRequestAsync(request, endpoint, options).join(); + } + + @Override + public CompletableFuture performRequestAsync(RequestT request, + Endpoint endpoint, TransportOptions options) { + + final ApacheHttpClient5Options requestOptions = (options == null) ? transportOptions : ApacheHttpClient5Options.of(options); + final CompletableFuture future = new CompletableFuture<>(); + final HttpUriRequestBase clientReq = prepareLowLevelRequest(request, endpoint, options); + final WarningsHandler warningsHandler = (requestOptions.getWarningsHandler() == null) ? + this.warningsHandler : requestOptions.getWarningsHandler(); + + try { + performRequestAsync(nextNodes(), requestOptions, clientReq, warningsHandler, future); + } catch(final IOException ex) { + future.completeExceptionally(ex); + } + + return future.thenApply(r -> { + try { + return (ResponseT)prepareResponse(r, endpoint); + } catch (final IOException ex) { + throw new CompletionException(ex); + } + }); + } + + @Override + public JsonpMapper jsonpMapper() { + return mapper; + } + + @Override + public TransportOptions options() { + return transportOptions; + } + + @Override + public void close() throws IOException { + client.close(); + } + + private void performRequestAsync(final NodeTuple> nodeTuple, final ApacheHttpClient5Options options, + final HttpUriRequestBase request, final WarningsHandler warningsHandler, final CompletableFuture listener) { + final RequestContext context = createContextForNextAttempt(options, request, nodeTuple.nodes.next(), nodeTuple.authCache); + Future future = client.execute( + context.requestProducer, + context.asyncResponseConsumer, + context.context, + new FutureCallback() { + @Override + public void completed(ClassicHttpResponse httpResponse) { + try { + ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, + httpResponse, warningsHandler); + if (responseOrResponseException.responseException == null) { + listener.complete(responseOrResponseException.response); + } else { + if (nodeTuple.nodes.hasNext()) { + performRequestAsync(nodeTuple, options, request, warningsHandler, listener); + } else { + listener.completeExceptionally(responseOrResponseException.responseException); + } + } + } catch (Exception e) { + listener.completeExceptionally(e); + } + } + + @Override + public void failed(Exception failure) { + try { + onFailure(context.node); + if (nodeTuple.nodes.hasNext()) { + performRequestAsync(nodeTuple, options, request, warningsHandler, listener); + } else { + listener.completeExceptionally(failure); + } + } catch (Exception e) { + listener.completeExceptionally(e); + } + } + + @Override + public void cancelled() { + listener.completeExceptionally(new CancellationException("request was cancelled")); + } + } + ); + + if (future instanceof org.apache.hc.core5.concurrent.Cancellable) { + request.setDependency((org.apache.hc.core5.concurrent.Cancellable) future); + } + } + + /** + * Replaces the nodes with which the client communicates. + * + * @param nodes the new nodes to communicate with. + */ + private void setNodes(Collection nodes) { + if (nodes == null || nodes.isEmpty()) { + throw new IllegalArgumentException("nodes must not be null or empty"); + } + AuthCache authCache = new BasicAuthCache(); + + Map nodesByHost = new LinkedHashMap<>(); + for (Node node : nodes) { + Objects.requireNonNull(node, "node cannot be null"); + // TODO should we throw an IAE if we have two nodes with the same host? + nodesByHost.put(node.getHost(), node); + authCache.put(node.getHost(), new BasicScheme()); + } + this.nodeTuple = new NodeTuple<>(Collections.unmodifiableList(new ArrayList<>(nodesByHost.values())), authCache); + this.denylist.clear(); + } + + private ResponseOrResponseException convertResponse(final HttpUriRequestBase request, final Node node, + final ClassicHttpResponse httpResponse, final WarningsHandler warningsHandler) throws IOException { + int statusCode = httpResponse.getCode(); + + Optional.ofNullable(httpResponse.getEntity()) + .map(HttpEntity::getContentEncoding) + .filter("gzip"::equalsIgnoreCase) + .map(gzipHeaderValue -> new GzipDecompressingEntity(httpResponse.getEntity())) + .ifPresent(httpResponse::setEntity); + + Response response = new Response(new RequestLine(request), node.getHost(), httpResponse); + Set ignoreErrorCodes = getIgnoreErrorCodes("400,401,403,404,405", request.getMethod()); + if (isSuccessfulResponse(statusCode) || ignoreErrorCodes.contains(response.getStatusLine().getStatusCode())) { + onResponse(node); + if (warningsHandler.warningsShouldFailRequest(response.getWarnings())) { + throw new WarningFailureException(response); + } + return new ResponseOrResponseException(response); + } + ResponseException responseException = new ResponseException(response); + if (isRetryStatus(statusCode)) { + // mark host dead and retry against next one + onFailure(node); + return new ResponseOrResponseException(responseException); + } + // mark host alive and don't retry, as the error should be a request problem + onResponse(node); + throw responseException; + } + + private static Set getIgnoreErrorCodes(String ignoreString, String requestMethod) { + Set ignoreErrorCodes; + if (ignoreString == null) { + if (HttpHead.METHOD_NAME.equals(requestMethod)) { + // 404 never causes error if returned for a HEAD request + ignoreErrorCodes = Collections.singleton(404); + } else { + ignoreErrorCodes = Collections.emptySet(); + } + } else { + String[] ignoresArray = ignoreString.split(","); + ignoreErrorCodes = new HashSet<>(); + if (HttpHead.METHOD_NAME.equals(requestMethod)) { + // 404 never causes error if returned for a HEAD request + ignoreErrorCodes.add(404); + } + for (String ignoreCode : ignoresArray) { + try { + ignoreErrorCodes.add(Integer.valueOf(ignoreCode)); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("ignore value should be a number, found [" + ignoreString + "] instead", e); + } + } + } + return ignoreErrorCodes; + } + + private static boolean isSuccessfulResponse(int statusCode) { + return statusCode < 300; + } + + private static boolean isRetryStatus(int statusCode) { + switch (statusCode) { + case 502: + case 503: + case 504: + return true; + } + return false; + } + + /** + * Returns a non-empty {@link Iterator} of nodes to be used for a request + * that match the {@link NodeSelector}. + *

+ * If there are no living nodes that match the {@link NodeSelector} + * this will return the dead node that matches the {@link NodeSelector} + * that is closest to being revived. + * @throws IOException if no nodes are available + */ + private NodeTuple> nextNodes() throws IOException { + NodeTuple> nodeTuple = this.nodeTuple; + Iterable hosts = selectNodes(nodeTuple, denylist, lastNodeIndex, nodeSelector); + return new NodeTuple<>(hosts.iterator(), nodeTuple.authCache); + } + + /** + * Select nodes to try and sorts them so that the first one will be tried initially, then the following ones + * if the previous attempt failed and so on. Package private for testing. + */ + static Iterable selectNodes( + NodeTuple> nodeTuple, + Map denylist, + AtomicInteger lastNodeIndex, + NodeSelector nodeSelector + ) throws IOException { + /* + * Sort the nodes into living and dead lists. + */ + List livingNodes = new ArrayList<>(Math.max(0, nodeTuple.nodes.size() - denylist.size())); + List deadNodes = new ArrayList<>(denylist.size()); + for (Node node : nodeTuple.nodes) { + DeadHostState deadness = denylist.get(node.getHost()); + if (deadness == null || deadness.shallBeRetried()) { + livingNodes.add(node); + } else { + deadNodes.add(new DeadNode(node, deadness)); + } + } + + if (false == livingNodes.isEmpty()) { + /* + * Normal state: there is at least one living node. If the + * selector is ok with any over the living nodes then use them + * for the request. + */ + List selectedLivingNodes = new ArrayList<>(livingNodes); + nodeSelector.select(selectedLivingNodes); + if (false == selectedLivingNodes.isEmpty()) { + /* + * Rotate the list using a global counter as the distance so subsequent + * requests will try the nodes in a different order. + */ + Collections.rotate(selectedLivingNodes, lastNodeIndex.getAndIncrement()); + return selectedLivingNodes; + } + } + + /* + * Last resort: there are no good nodes to use, either because + * the selector rejected all the living nodes or because there aren't + * any living ones. Either way, we want to revive a single dead node + * that the NodeSelectors are OK with. We do this by passing the dead + * nodes through the NodeSelector so it can have its say in which nodes + * are ok. If the selector is ok with any of the nodes then we will take + * the one in the list that has the lowest revival time and try it. + */ + if (false == deadNodes.isEmpty()) { + final List selectedDeadNodes = new ArrayList<>(deadNodes); + /* + * We'd like NodeSelectors to remove items directly from deadNodes + * so we can find the minimum after it is filtered without having + * to compare many things. This saves us a sort on the unfiltered + * list. + */ + nodeSelector.select(() -> new DeadNodeIteratorAdapter(selectedDeadNodes.iterator())); + if (false == selectedDeadNodes.isEmpty()) { + return Collections.singletonList(Collections.min(selectedDeadNodes).node); + } + } + throw new IOException( + "NodeSelector [" + nodeSelector + "] rejected all nodes, " + "living " + livingNodes + " and dead " + deadNodes + ); + } + + /** + * Called after each failed attempt. + * Receives as an argument the host that was used for the failed attempt. + */ + private void onFailure(Node node) { + while (true) { + DeadHostState previousDeadHostState = denylist.putIfAbsent( + node.getHost(), + new DeadHostState(DeadHostState.DEFAULT_TIME_SUPPLIER) + ); + if (previousDeadHostState == null) { + if (logger.isDebugEnabled()) { + logger.debug("added [" + node + "] to denylist"); + } + break; + } + if (denylist.replace(node.getHost(), previousDeadHostState, new DeadHostState(previousDeadHostState))) { + if (logger.isDebugEnabled()) { + logger.debug("updated [" + node + "] already in denylist"); + } + break; + } + } + } + + private RequestContext createContextForNextAttempt(final ApacheHttpClient5Options options, + final HttpUriRequestBase request, final Node node, final AuthCache authCache) { + request.reset(); + + if (options.getRequestConfig() != null) { + request.setConfig(options.getRequestConfig()); + } + + return new RequestContext(options, request, node, authCache); + } + + private ResponseT prepareResponse(Response clientResp, + Endpoint endpoint + ) throws IOException { + + try { + int statusCode = clientResp.getStatusLine().getStatusCode(); + + if (endpoint.isError(statusCode)) { + JsonpDeserializer errorDeserializer = endpoint.errorDeserializer(statusCode); + if (errorDeserializer == null) { + throw new TransportException( + "Request failed with status code '" + statusCode + "'", + new ResponseException(clientResp) + ); + } + + HttpEntity entity = clientResp.getEntity(); + if (entity == null) { + throw new TransportException( + "Expecting a response body, but none was sent", + new ResponseException(clientResp) + ); + } + + // We may have to replay it. + entity = new BufferedHttpEntity(entity); + + try { + InputStream content = entity.getContent(); + try (JsonParser parser = mapper.jsonProvider().createParser(content)) { + ErrorT error = errorDeserializer.deserialize(parser, mapper); + // TODO: have the endpoint provide the exception constructor + throw new OpenSearchException((ErrorResponse) error); + } + } catch(MissingRequiredPropertyException errorEx) { + // Could not decode exception, try the response type + try { + ResponseT response = decodeResponse(statusCode, entity, clientResp, endpoint); + return response; + } catch(Exception respEx) { + // No better luck: throw the original error decoding exception + throw new TransportException("Failed to decode error response", new ResponseException(clientResp)); + } + } + } else { + return decodeResponse(statusCode, clientResp.getEntity(), clientResp, endpoint); + } + } finally { + EntityUtils.consume(clientResp.getEntity()); + } + } + + private HttpUriRequestBase prepareLowLevelRequest( + RequestT request, + Endpoint endpoint, + @Nullable TransportOptions options + ) { + String method = endpoint.method(request); + String path = endpoint.requestUrl(request); + Map params = endpoint.queryParameters(request); + + final URI uri = URI.create(path + (params.isEmpty() ? "" : params + .entrySet() + .stream() + .map(e -> e.getKey() + "=" + e.getValue()) + .collect(Collectors.joining("&")))); + final HttpUriRequestBase clientReq = new HttpUriRequestBase(method, uri); + if (endpoint.hasRequestBody()) { + // Request has a body and must implement JsonpSerializable or NdJsonpSerializable + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + + if (request instanceof NdJsonpSerializable) { + writeNdJson((NdJsonpSerializable) request, baos); + } else { + JsonGenerator generator = mapper.jsonProvider().createGenerator(baos); + mapper.serialize(request, generator); + generator.close(); + } + + clientReq.setEntity(new ByteArrayEntity(baos.toByteArray(), JsonContentType)); + } + + return clientReq; + } + + /** + * Called after each successful request call. + * Receives as an argument the host that was used for the successful request. + */ + private void onResponse(Node node) { + DeadHostState removedHost = this.denylist.remove(node.getHost()); + if (logger.isDebugEnabled() && removedHost != null) { + logger.debug("removed [" + node + "] from denylist"); + } + } + + private ResponseT decodeResponse( + int statusCode, @Nullable HttpEntity entity, Response clientResp, Endpoint endpoint + ) throws IOException { + + if (endpoint instanceof BooleanEndpoint) { + BooleanEndpoint bep = (BooleanEndpoint) endpoint; + + @SuppressWarnings("unchecked") + ResponseT response = (ResponseT) new BooleanResponse(bep.getResult(statusCode)); + return response; + + } else if (endpoint instanceof JsonEndpoint){ + @SuppressWarnings("unchecked") + JsonEndpoint jsonEndpoint = (JsonEndpoint)endpoint; + // Successful response + ResponseT response = null; + JsonpDeserializer responseParser = jsonEndpoint.responseDeserializer(); + if (responseParser != null) { + // Expecting a body + if (entity == null) { + throw new TransportException( + "Expecting a response body, but none was sent", + new ResponseException(clientResp) + ); + } + InputStream content = entity.getContent(); + try (JsonParser parser = mapper.jsonProvider().createParser(content)) { + response = responseParser.deserialize(parser, mapper); + }; + } + return response; + } else { + throw new TransportException("Unhandled endpoint type: '" + endpoint.getClass().getName() + "'"); + } + } + + /** + * {@link NodeTuple} enables the {@linkplain Node}s and {@linkplain AuthCache} + * to be set together in a thread safe, volatile way. + */ + static class NodeTuple { + final T nodes; + final AuthCache authCache; + + NodeTuple(final T nodes, final AuthCache authCache) { + this.nodes = nodes; + this.authCache = authCache; + } + } + + /** + * Contains a reference to a denylisted node and the time until it is + * revived. We use this so we can do a single pass over the denylist. + */ + private static class DeadNode implements Comparable { + final Node node; + final DeadHostState deadness; + + DeadNode(Node node, DeadHostState deadness) { + this.node = node; + this.deadness = deadness; + } + + @Override + public String toString() { + return node.toString(); + } + + @Override + public int compareTo(DeadNode rhs) { + return deadness.compareTo(rhs.deadness); + } + } + + /** + * Adapts an Iterator<DeadNodeAndRevival> into an + * Iterator<Node>. + */ + private static class DeadNodeIteratorAdapter implements Iterator { + private final Iterator itr; + + private DeadNodeIteratorAdapter(Iterator itr) { + this.itr = itr; + } + + @Override + public boolean hasNext() { + return itr.hasNext(); + } + + @Override + public Node next() { + return itr.next().node; + } + + @Override + public void remove() { + itr.remove(); + } + } + + /** + * Write an nd-json value by serializing each of its items on a separate line, recursing if its items themselves implement + * {@link NdJsonpSerializable} to flattening nested structures. + */ + private void writeNdJson(NdJsonpSerializable value, ByteArrayOutputStream baos) { + Iterator values = value._serializables(); + while(values.hasNext()) { + Object item = values.next(); + if (item instanceof NdJsonpSerializable && item != value) { // do not recurse on the item itself + writeNdJson((NdJsonpSerializable) item, baos); + } else { + JsonGenerator generator = mapper.jsonProvider().createGenerator(baos); + mapper.serialize(item, generator); + generator.close(); + baos.write('\n'); + } + } + } + + private static class RequestContext { + private final Node node; + private final AsyncRequestProducer requestProducer; + private final AsyncResponseConsumer asyncResponseConsumer; + private final HttpClientContext context; + + RequestContext(final ApacheHttpClient5Options options, final HttpUriRequestBase request, + final Node node, final AuthCache authCache) { + this.node = node; + this.requestProducer = HttpUriRequestProducer.create(request, node.getHost()); + this.asyncResponseConsumer = options + .getHttpAsyncResponseConsumerFactory() + .createHttpAsyncResponseConsumer(); + this.context = HttpClientContext.create(); + context.setAuthCache(new WrappingAuthCache(context, authCache)); + } + } + + /** + * The Apache HttpClient 5 adds "Authorization" header even if the credentials for basic authentication are not provided + * (effectively, username and password are 'null'). To workaround that, wrapping the AuthCache around current HttpClientContext + * and ensuring that the credentials are indeed provided for particular HttpHost, otherwise returning no authentication scheme + * even if it is present in the cache. + */ + private static class WrappingAuthCache implements AuthCache { + private final HttpClientContext context; + private final AuthCache delegate; + private final boolean usePersistentCredentials = true; + + WrappingAuthCache(HttpClientContext context, AuthCache delegate) { + this.context = context; + this.delegate = delegate; + } + + @Override + public void put(HttpHost host, AuthScheme authScheme) { + delegate.put(host, authScheme); + } + + @Override + public AuthScheme get(HttpHost host) { + AuthScheme authScheme = delegate.get(host); + + if (authScheme != null) { + final CredentialsProvider credsProvider = context.getCredentialsProvider(); + if (credsProvider != null) { + final String schemeName = authScheme.getName(); + final AuthScope authScope = new AuthScope(host, null, schemeName); + final Credentials creds = credsProvider.getCredentials(authScope, context); + + // See please https://issues.apache.org/jira/browse/HTTPCLIENT-2203 + if (authScheme instanceof BasicScheme) { + ((BasicScheme) authScheme).initPreemptive(creds); + } + + if (creds == null) { + return null; + } + } + } + + return authScheme; + } + + @Override + public void remove(HttpHost host) { + if (!usePersistentCredentials) { + delegate.remove(host); + } + } + + @Override + public void clear() { + delegate.clear(); + } + + } + + private static class ResponseOrResponseException { + private final Response response; + private final ResponseException responseException; + + ResponseOrResponseException(Response response) { + this.response = Objects.requireNonNull(response); + this.responseException = null; + } + + ResponseOrResponseException(ResponseException responseException) { + this.responseException = Objects.requireNonNull(responseException); + this.response = null; + } + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/DeadHostState.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/DeadHostState.java new file mode 100644 index 0000000000..f186d2033d --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/DeadHostState.java @@ -0,0 +1,125 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5; + +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +/** + * Holds the state of a dead connection to a host. Keeps track of how many failed attempts were performed and + * when the host should be retried (based on number of previous failed attempts). + * Class is immutable, a new copy of it should be created each time the state has to be changed. + */ +final class DeadHostState implements Comparable { + + private static final long MIN_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(1); + static final long MAX_CONNECTION_TIMEOUT_NANOS = TimeUnit.MINUTES.toNanos(30); + static final Supplier DEFAULT_TIME_SUPPLIER = System::nanoTime; + + private final int failedAttempts; + private final long deadUntilNanos; + private final Supplier timeSupplier; + + /** + * Build the initial dead state of a host. Useful when a working host stops functioning + * and needs to be marked dead after its first failure. In such case the host will be retried after a minute or so. + * + * @param timeSupplier a way to supply the current time and allow for unit testing + */ + DeadHostState(Supplier timeSupplier) { + this.failedAttempts = 1; + this.deadUntilNanos = timeSupplier.get() + MIN_CONNECTION_TIMEOUT_NANOS; + this.timeSupplier = timeSupplier; + } + + /** + * Build the dead state of a host given its previous dead state. Useful when a host has been failing before, hence + * it already failed for one or more consecutive times. The more failed attempts we register the longer we wait + * to retry that same host again. Minimum is 1 minute (for a node the only failed once created + * through {@link #DeadHostState(Supplier)}), maximum is 30 minutes (for a node that failed more than 10 consecutive times) + * + * @param previousDeadHostState the previous state of the host which allows us to increase the wait till the next retry attempt + */ + DeadHostState(DeadHostState previousDeadHostState) { + long timeoutNanos = (long) Math.min( + MIN_CONNECTION_TIMEOUT_NANOS * 2 * Math.pow(2, previousDeadHostState.failedAttempts * 0.5 - 1), + MAX_CONNECTION_TIMEOUT_NANOS + ); + this.deadUntilNanos = previousDeadHostState.timeSupplier.get() + timeoutNanos; + this.failedAttempts = previousDeadHostState.failedAttempts + 1; + this.timeSupplier = previousDeadHostState.timeSupplier; + } + + /** + * Indicates whether it's time to retry to failed host or not. + * + * @return true if the host should be retried, false otherwise + */ + boolean shallBeRetried() { + return timeSupplier.get() - deadUntilNanos > 0; + } + + /** + * Returns the timestamp (nanos) till the host is supposed to stay dead without being retried. + * After that the host should be retried. + */ + long getDeadUntilNanos() { + return deadUntilNanos; + } + + int getFailedAttempts() { + return failedAttempts; + } + + @Override + public int compareTo(DeadHostState other) { + if (timeSupplier != other.timeSupplier) { + throw new IllegalArgumentException( + "can't compare DeadHostStates holding different time suppliers as they may " + "be based on different clocks" + ); + } + return Long.compare(deadUntilNanos, other.deadUntilNanos); + } + + @Override + public String toString() { + return "DeadHostState{" + + "failedAttempts=" + + failedAttempts + + ", deadUntilNanos=" + + deadUntilNanos + + ", timeSupplier=" + + timeSupplier + + '}'; + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/HttpAsyncResponseConsumerFactory.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/HttpAsyncResponseConsumerFactory.java new file mode 100644 index 0000000000..348a466da9 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/HttpAsyncResponseConsumerFactory.java @@ -0,0 +1,86 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5; + +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.nio.AsyncResponseConsumer; +import org.opensearch.client.nio.HeapBufferedAsyncResponseConsumer; + +/** + * Factory used to create instances of {@link AsyncResponseConsumer}. Each request retry needs its own instance of the + * consumer object. Users can implement this interface and pass their own instance to the specialized + * performRequest methods that accept an {@link HttpAsyncResponseConsumerFactory} instance as argument. + */ +public interface HttpAsyncResponseConsumerFactory { + + /** + * Creates the default type of {@link AsyncResponseConsumer}, based on heap buffering with a buffer limit of 100MB. + */ + HttpAsyncResponseConsumerFactory DEFAULT = new HeapBufferedResponseConsumerFactory( + HeapBufferedResponseConsumerFactory.DEFAULT_BUFFER_LIMIT); + + /** + * Creates the {@link AsyncResponseConsumer}, called once per request attempt. + */ + AsyncResponseConsumer createHttpAsyncResponseConsumer(); + + /** + * Default factory used to create instances of {@link AsyncResponseConsumer}. + * Creates one instance of {@link HeapBufferedAsyncResponseConsumer} for each request attempt, with a configurable + * buffer limit which defaults to 100MB. + */ + class HeapBufferedResponseConsumerFactory implements HttpAsyncResponseConsumerFactory { + + // default buffer limit is 100MB + static final int DEFAULT_BUFFER_LIMIT = 100 * 1024 * 1024; + + private final int bufferLimit; + + /** + * Creates a {@link HeapBufferedResponseConsumerFactory} instance with the given buffer limit. + * + * @param bufferLimitBytes the buffer limit to be applied to this instance + */ + public HeapBufferedResponseConsumerFactory(int bufferLimitBytes) { + this.bufferLimit = bufferLimitBytes; + } + + /** + * Creates the {@link AsyncResponseConsumer}, called once per request attempt. + */ + @Override + public AsyncResponseConsumer createHttpAsyncResponseConsumer() { + return new HeapBufferedAsyncResponseConsumer(bufferLimit); + } + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/Response.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/Response.java new file mode 100644 index 0000000000..2f36d517de --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/Response.java @@ -0,0 +1,214 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5; + +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.message.RequestLine; +import org.apache.hc.core5.http.message.StatusLine; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Holds an opensearch response. It wraps the {@link HttpResponse} returned and associates it with + * its corresponding {@link RequestLine} and {@link HttpHost}. + */ +final class Response { + + private final RequestLine requestLine; + private final HttpHost host; + private final ClassicHttpResponse response; + + Response(RequestLine requestLine, HttpHost host, ClassicHttpResponse response) { + Objects.requireNonNull(requestLine, "requestLine cannot be null"); + Objects.requireNonNull(host, "host cannot be null"); + Objects.requireNonNull(response, "response cannot be null"); + this.requestLine = requestLine; + this.host = host; + this.response = response; + } + + /** + * Returns the request line that generated this response + */ + public RequestLine getRequestLine() { + return requestLine; + } + + /** + * Returns the node that returned this response + */ + public HttpHost getHost() { + return host; + } + + /** + * Returns the status line of the current response + */ + public StatusLine getStatusLine() { + return new StatusLine(response); + } + + /** + * Returns all the response headers + */ + public Header[] getHeaders() { + return response.getHeaders(); + } + + /** + * Returns the value of the first header with a specified name of this message. + * If there is more than one matching header in the message the first element is returned. + * If there is no matching header in the message null is returned. + * + * @param name header name + */ + public String getHeader(String name) { + Header header = response.getFirstHeader(name); + if (header == null) { + return null; + } + return header.getValue(); + } + + /** + * Returns the response body available, null otherwise + * @see HttpEntity + */ + public HttpEntity getEntity() { + return response.getEntity(); + } + + /** + * Optimized regular expression to test if a string matches the RFC 1123 date + * format (with quotes and leading space). Start/end of line characters and + * atomic groups are used to prevent backtracking. + */ + private static final Pattern WARNING_HEADER_DATE_PATTERN = Pattern.compile("^ " + // start of line, leading space + // quoted RFC 1123 date format + "\"" + // opening quote + "(?>Mon|Tue|Wed|Thu|Fri|Sat|Sun), " + // day of week, atomic group to prevent backtracking + "\\d{2} " + // 2-digit day + "(?>Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec) " + // month, atomic group to prevent backtracking + "\\d{4} " + // 4-digit year + "\\d{2}:\\d{2}:\\d{2} " + // (two-digit hour):(two-digit minute):(two-digit second) + "GMT" + // GMT + "\"$"); // closing quote (optional, since an older version can still send a warn-date), end of line + + /** + * Length of RFC 1123 format (with quotes and leading space), used in + * matchWarningHeaderPatternByPrefix(String). + */ + private static final int WARNING_HEADER_DATE_LENGTH = 0 + 1 + 1 + 3 + 1 + 1 + 2 + 1 + 3 + 1 + 4 + 1 + 2 + 1 + 2 + 1 + 2 + 1 + 3 + 1; + + /** + * Tests if a string matches the RFC 7234 specification for warning headers. + * This assumes that the warn code is always 299 and the warn agent is always + * OpenSearch. + * + * @param s the value of a warning header formatted according to RFC 7234 + * @return {@code true} if the input string matches the specification + */ + private static boolean matchWarningHeaderPatternByPrefix(final String s) { + return s.startsWith("299 OpenSearch-"); + } + + /** + * Refer to org.opensearch.common.logging.DeprecationLogger + */ + private static String extractWarningValueFromWarningHeader(final String s) { + String warningHeader = s; + + /* + * The following block tests for the existence of a RFC 1123 date in the warning header. If the date exists, it is removed for + * extractWarningValueFromWarningHeader(String) to work properly (as it does not handle dates). + */ + if (s.length() > WARNING_HEADER_DATE_LENGTH) { + final String possibleDateString = s.substring(s.length() - WARNING_HEADER_DATE_LENGTH); + final Matcher matcher = WARNING_HEADER_DATE_PATTERN.matcher(possibleDateString); + + if (matcher.matches()) { + warningHeader = warningHeader.substring(0, s.length() - WARNING_HEADER_DATE_LENGTH); + } + } + + final int firstQuote = warningHeader.indexOf('\"'); + final int lastQuote = warningHeader.length() - 1; + final String warningValue = warningHeader.substring(firstQuote + 1, lastQuote); + return warningValue; + } + + /** + * Returns a list of all warning headers returned in the response. + */ + public List getWarnings() { + List warnings = new ArrayList<>(); + for (Header header : response.getHeaders("Warning")) { + String warning = header.getValue(); + if (matchWarningHeaderPatternByPrefix(warning)) { + warnings.add(extractWarningValueFromWarningHeader(warning)); + } else { + warnings.add(warning); + } + } + return warnings; + } + + /** + * Returns true if there is at least one warning header returned in the + * response. + */ + public boolean hasWarnings() { + Header[] warnings = response.getHeaders("Warning"); + return warnings != null && warnings.length > 0; + } + + ClassicHttpResponse getHttpResponse() { + return response; + } + + /** + * Convert response to string representation + */ + @Override + public String toString() { + return "Response{" + "requestLine=" + requestLine + ", host=" + host + ", response=" + getStatusLine() + '}'; + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ResponseException.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ResponseException.java new file mode 100644 index 0000000000..930f0dd849 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/ResponseException.java @@ -0,0 +1,96 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5; + +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.ParseException; +import org.apache.hc.core5.http.io.entity.BufferedHttpEntity; +import org.apache.hc.core5.http.io.entity.EntityUtils; + +import java.io.IOException; +import java.util.Locale; + +/** + * Exception thrown when an opensearch node responds to a request with a status code that indicates an error. + * Holds the response that was returned. + */ +final class ResponseException extends IOException { + + private final Response response; + + /** + * Creates a ResponseException containing the given {@code Response}. + * + * @param response The error response. + */ + ResponseException(Response response) throws IOException { + super(buildMessage(response)); + this.response = response; + } + + static String buildMessage(Response response) throws IOException { + String message = String.format( + Locale.ROOT, + "method [%s], host [%s], URI [%s], status line [%s]", + response.getRequestLine().getMethod(), + response.getHost(), + response.getRequestLine().getUri(), + response.getStatusLine().toString() + ); + + if (response.hasWarnings()) { + message += "\n" + "Warnings: " + response.getWarnings(); + } + + HttpEntity entity = response.getEntity(); + if (entity != null) { + if (entity.isRepeatable() == false) { + entity = new BufferedHttpEntity(entity); + response.getHttpResponse().setEntity(entity); + } + try { + message += "\n" + EntityUtils.toString(entity); + } catch (final ParseException ex) { + throw new IOException(ex); + } + } + return message; + } + + /** + * Returns the {@link Response} that caused this exception to be thrown. + */ + public Response getResponse() { + return response; + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/WarningFailureException.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/WarningFailureException.java new file mode 100644 index 0000000000..8627fd9d0c --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/WarningFailureException.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5; + +import java.io.IOException; + +/** + * This exception is used to indicate that one or more {@link Response#getWarnings()} exist + * and is typically used when the {@link ApacheHttpClient5Transport} is set to fail by passing + * `true` to `strictDeprecationMode`. + */ +// This class extends RuntimeException in order to deal with wrapping that is done in FutureUtils on exception. +// if the exception is not of type OpenSearchException or RuntimeException it will be wrapped in a UncategorizedExecutionException +public final class WarningFailureException extends RuntimeException { + private final Response response; + + /** + * Creates a {@link WarningFailureException} instance. + * + * @param response the response that contains warnings. + * @throws IOException if there is a problem building the exception message. + */ + public WarningFailureException(Response response) throws IOException { + super(ResponseException.buildMessage(response)); + this.response = response; + } + + /** + * Wrap a {@linkplain WarningFailureException} with another one with the current + * stack trace. This is used during synchronous calls so that the caller + * ends up in the stack trace of the exception thrown. + * + * @param e the exception to be wrapped. + */ + WarningFailureException(WarningFailureException e) { + super(e.getMessage(), e); + this.response = e.getResponse(); + } + + /** + * Returns the {@link Response} that caused this exception to be thrown. + */ + public Response getResponse() { + return response; + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/WarningsHandler.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/WarningsHandler.java new file mode 100644 index 0000000000..1a0b73d07d --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/WarningsHandler.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5; + +import java.util.List; + +/** + * Called if there are warnings to determine if those warnings should fail the + * request. + */ +public interface WarningsHandler { + + /** + * Determines whether the given list of warnings should fail the request. + * + * @param warnings a list of warnings. + * @return boolean indicating if the request should fail. + */ + boolean warningsShouldFailRequest(List warnings); + + /** + * The permissive warnings handler. Warnings will not fail the request. + */ + WarningsHandler PERMISSIVE = new WarningsHandler() { + @Override + public boolean warningsShouldFailRequest(List warnings) { + return false; + } + + @Override + public String toString() { + return "permissive"; + } + }; + + /** + * The strict warnings handler. Warnings will fail the request. + */ + WarningsHandler STRICT = new WarningsHandler() { + @Override + public boolean warningsShouldFailRequest(List warnings) { + return false == warnings.isEmpty(); + } + + @Override + public String toString() { + return "strict"; + } + }; +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncEntityConsumer.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncEntityConsumer.java new file mode 100644 index 0000000000..aa4d73a237 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncEntityConsumer.java @@ -0,0 +1,139 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5.internal; + +import org.apache.hc.core5.http.ContentTooLongException; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.nio.AsyncEntityConsumer; +import org.apache.hc.core5.http.nio.entity.AbstractBinAsyncEntityConsumer; +import org.apache.hc.core5.util.ByteArrayBuffer; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Default implementation of {@link AsyncEntityConsumer}. Buffers the whole + * response content in heap memory, meaning that the size of the buffer is equal to the content-length of the response. + * Limits the size of responses that can be read based on a configurable argument. Throws an exception in case the entity is longer + * than the configured buffer limit. + */ +public class HeapBufferedAsyncEntityConsumer extends AbstractBinAsyncEntityConsumer { + + private final int bufferLimitBytes; + private AtomicReference bufferRef = new AtomicReference<>(); + + /** + * Creates a new instance of this consumer with the provided buffer limit. + * + * @param bufferLimit the buffer limit. Must be greater than 0. + * @throws IllegalArgumentException if {@code bufferLimit} is less than or equal to 0. + */ + public HeapBufferedAsyncEntityConsumer(int bufferLimit) { + if (bufferLimit <= 0) { + throw new IllegalArgumentException("bufferLimit must be greater than 0"); + } + this.bufferLimitBytes = bufferLimit; + } + + /** + * Get the limit of the buffer. + */ + public int getBufferLimit() { + return bufferLimitBytes; + } + + /** + * Triggered to signal beginning of entity content stream. + * + * @param contentType the entity content type + */ + @Override + protected void streamStart(final ContentType contentType) throws HttpException, IOException {} + + /** + * Triggered to obtain the capacity increment. + * + * @return the number of bytes this consumer is prepared to process. + */ + @Override + protected int capacityIncrement() { + return Integer.MAX_VALUE; + } + + /** + * Triggered to pass incoming data packet to the data consumer. + * + * @param src the data packet. + * @param endOfStream flag indicating whether this data packet is the last in the data stream. + * + */ + @Override + protected void data(final ByteBuffer src, final boolean endOfStream) throws IOException { + if (src == null) { + return; + } + + ByteArrayBuffer buffer = bufferRef.get(); + if (buffer == null) { + buffer = new ByteArrayBuffer(bufferLimitBytes); + if (bufferRef.compareAndSet(null, buffer) == false) { + buffer = bufferRef.get(); + } + } + + int len = src.limit(); + if (buffer.length() + len > bufferLimitBytes) { + throw new ContentTooLongException( + "entity content is too long [" + len + "] for the configured buffer limit [" + bufferLimitBytes + "]" + ); + } + + if (len < 0) { + len = 4096; + } + + if (src.hasArray()) { + buffer.append(src.array(), src.arrayOffset() + src.position(), src.remaining()); + } else { + while (src.hasRemaining()) { + buffer.append(src.get()); + } + } + } + + /** + * Triggered to generate entity representation. + * + * @return the entity content + */ + @Override + protected byte[] generateContent() throws IOException { + final ByteArrayBuffer buffer = bufferRef.get(); + return buffer == null ? new byte[0] : buffer.toByteArray(); + } + + /** + * Release resources being held + */ + @Override + public void releaseResources() { + ByteArrayBuffer buffer = bufferRef.getAndSet(null); + if (buffer != null) { + buffer.clear(); + buffer = null; + } + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncResponseConsumer.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncResponseConsumer.java new file mode 100644 index 0000000000..4d0ceb66e7 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HeapBufferedAsyncResponseConsumer.java @@ -0,0 +1,123 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5.internal; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hc.core5.http.ClassicHttpResponse; +import org.apache.hc.core5.http.ContentType; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpException; +import org.apache.hc.core5.http.HttpHeaders; +import org.apache.hc.core5.http.HttpResponse; +import org.apache.hc.core5.http.io.entity.ByteArrayEntity; +import org.apache.hc.core5.http.message.BasicClassicHttpResponse; +import org.apache.hc.core5.http.nio.AsyncResponseConsumer; +import org.apache.hc.core5.http.nio.support.AbstractAsyncResponseConsumer; +import org.apache.hc.core5.http.protocol.HttpContext; + +import java.io.IOException; + +/** + * Default implementation of {@link AsyncResponseConsumer}. Buffers the whole + * response content in heap memory, meaning that the size of the buffer is equal to the content-length of the response. + * Limits the size of responses that can be read based on a configurable argument. Throws an exception in case the entity is longer + * than the configured buffer limit. + */ +public class HeapBufferedAsyncResponseConsumer extends AbstractAsyncResponseConsumer { + private static final Log LOGGER = LogFactory.getLog(HeapBufferedAsyncResponseConsumer.class); + private final int bufferLimit; + + /** + * Creates a new instance of this consumer with the provided buffer limit. + * + * @param bufferLimit the buffer limit. Must be greater than 0. + * @throws IllegalArgumentException if {@code bufferLimit} is less than or equal to 0. + */ + public HeapBufferedAsyncResponseConsumer(int bufferLimit) { + super(new HeapBufferedAsyncEntityConsumer(bufferLimit)); + this.bufferLimit = bufferLimit; + } + + /** + * Get the limit of the buffer. + */ + public int getBufferLimit() { + return bufferLimit; + } + + /** + * Triggered to signal receipt of an intermediate (1xx) HTTP response. + * + * @param response the intermediate (1xx) HTTP response. + * @param context the actual execution context. + */ + @Override + public void informationResponse(final HttpResponse response, final HttpContext context) throws HttpException, IOException {} + + /** + * Triggered to generate object that represents a result of response message processing. + * @param response the response message. + * @param entity the response entity. + * @param contentType the response content type. + * @return the result of response processing. + */ + @Override + protected ClassicHttpResponse buildResult(final HttpResponse response, final byte[] entity, final ContentType contentType) { + final ClassicHttpResponse classicResponse = new BasicClassicHttpResponse(response.getCode()); + classicResponse.setVersion(response.getVersion()); + classicResponse.setHeaders(response.getHeaders()); + classicResponse.setReasonPhrase(response.getReasonPhrase()); + if (response.getLocale() != null) { + classicResponse.setLocale(response.getLocale()); + } + + if (entity != null) { + String encoding = null; + + try { + final Header contentEncoding = response.getHeader(HttpHeaders.CONTENT_ENCODING); + if (contentEncoding != null) { + encoding = contentEncoding.getValue(); + } + } catch (final HttpException ex) { + LOGGER.debug("Unable to detect content encoding", ex); + } + + final ByteArrayEntity httpEntity = new ByteArrayEntity(entity, contentType, encoding); + classicResponse.setEntity(httpEntity); + } + + return classicResponse; + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HttpEntityAsyncEntityProducer.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HttpEntityAsyncEntityProducer.java new file mode 100644 index 0000000000..1c669a55c7 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HttpEntityAsyncEntityProducer.java @@ -0,0 +1,182 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.httpclient5.internal; + +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.nio.AsyncEntityProducer; +import org.apache.hc.core5.http.nio.DataStreamChannel; +import org.apache.hc.core5.http.nio.ResourceHolder; +import org.apache.hc.core5.util.Args; +import org.apache.hc.core5.util.Asserts; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +/** + * The {@link AsyncEntityProducer} implementation for {@link HttpEntity} + */ +public class HttpEntityAsyncEntityProducer implements AsyncEntityProducer { + + private final HttpEntity entity; + private final ByteBuffer byteBuffer; + private final boolean chunked; + private final AtomicReference exception; + private final AtomicReference channelRef; + private boolean eof; + + /** + * Create new async HTTP entity producer + * @param entity HTTP entity + * @param bufferSize buffer size + */ + public HttpEntityAsyncEntityProducer(final HttpEntity entity, final int bufferSize) { + this.entity = Args.notNull(entity, "Http Entity"); + this.byteBuffer = ByteBuffer.allocate(bufferSize); + this.chunked = entity.isChunked(); + this.exception = new AtomicReference<>(); + this.channelRef = new AtomicReference<>(); + } + + /** + * Create new async HTTP entity producer with default buffer size (8192 bytes) + * @param entity HTTP entity + */ + public HttpEntityAsyncEntityProducer(final HttpEntity entity) { + this(entity, 8192); + } + + /** + * Determines whether the producer can consistently produce the same content + * after invocation of {@link ResourceHolder#releaseResources()}. + */ + @Override + public boolean isRepeatable() { + return entity.isRepeatable(); + } + + /** + * Returns content type of the entity, if known. + */ + @Override + public String getContentType() { + return entity.getContentType(); + } + + /** + * Returns length of the entity, if known. + */ + @Override + public long getContentLength() { + return entity.getContentLength(); + } + + /** + * Returns the number of bytes immediately available for output. + * This method can be used as a hint to control output events + * of the underlying I/O session. + * + * @return the number of bytes immediately available for output + */ + @Override + public int available() { + return Integer.MAX_VALUE; + } + + /** + * Returns content encoding of the entity, if known. + */ + @Override + public String getContentEncoding() { + return entity.getContentEncoding(); + } + + /** + * Returns chunked transfer hint for this entity. + *

+ * The behavior of wrapping entities is implementation dependent, + * but should respect the primary purpose. + *

+ */ + @Override + public boolean isChunked() { + return chunked; + } + + /** + * Preliminary declaration of trailing headers. + */ + @Override + public Set getTrailerNames() { + return entity.getTrailerNames(); + } + + /** + * Triggered to signal the ability of the underlying data channel + * to accept more data. The data producer can choose to write data + * immediately inside the call or asynchronously at some later point. + * + * @param channel the data channel capable to accepting more data. + */ + @Override + public void produce(final DataStreamChannel channel) throws IOException { + ReadableByteChannel stream = channelRef.get(); + if (stream == null) { + stream = Channels.newChannel(entity.getContent()); + Asserts.check(channelRef.getAndSet(stream) == null, "Illegal producer state"); + } + if (!eof) { + final int bytesRead = stream.read(byteBuffer); + if (bytesRead < 0) { + eof = true; + } + } + if (byteBuffer.position() > 0) { + byteBuffer.flip(); + channel.write(byteBuffer); + byteBuffer.compact(); + } + if (eof && byteBuffer.position() == 0) { + channel.endStream(); + releaseResources(); + } + } + + /** + * Triggered to signal a failure in data generation. + * + * @param cause the cause of the failure. + */ + @Override + public void failed(final Exception cause) { + if (exception.compareAndSet(null, cause)) { + releaseResources(); + } + } + + /** + * Release resources being held + */ + @Override + public void releaseResources() { + eof = false; + final ReadableByteChannel stream = channelRef.getAndSet(null); + if (stream != null) { + try { + stream.close(); + } catch (final IOException ex) { + /* Close quietly */ + } + } + } + +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HttpUriRequestProducer.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HttpUriRequestProducer.java new file mode 100644 index 0000000000..b9940009c7 --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/HttpUriRequestProducer.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.client.transport.httpclient5.internal; + +import org.apache.hc.client5.http.classic.methods.HttpUriRequestBase; +import org.apache.hc.core5.http.HttpEntity; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.nio.AsyncEntityProducer; +import org.apache.hc.core5.http.nio.support.BasicRequestProducer; +import org.apache.hc.core5.net.URIAuthority; +import org.apache.hc.core5.util.Args; + +/** + * The producer of the {@link HttpUriRequestBase} instances associated with a particular {@link HttpHost} + */ +public class HttpUriRequestProducer extends BasicRequestProducer { + private final HttpUriRequestBase request; + + HttpUriRequestProducer(final HttpUriRequestBase request, final AsyncEntityProducer entityProducer) { + super(request, entityProducer); + this.request = request; + } + + /** + * Get the produced {@link HttpUriRequestBase} instance + * @return produced {@link HttpUriRequestBase} instance + */ + public HttpUriRequestBase getRequest() { + return request; + } + + /** + * Create new request producer for {@link HttpUriRequestBase} instance and {@link HttpHost} + * @param request {@link HttpUriRequestBase} instance + * @param host {@link HttpHost} instance + * @return new request producer + */ + public static HttpUriRequestProducer create(final HttpUriRequestBase request, final HttpHost host) { + Args.notNull(request, "Request"); + Args.notNull(host, "HttpHost"); + + // TODO: Should we copy request here instead of modifying in place? + request.setAuthority(new URIAuthority(host)); + request.setScheme(host.getSchemeName()); + + final HttpEntity entity = request.getEntity(); + AsyncEntityProducer entityProducer = null; + + if (entity != null) { + entityProducer = new HttpEntityAsyncEntityProducer(entity); + } + + return new HttpUriRequestProducer(request, entityProducer); + } + +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/Node.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/Node.java new file mode 100644 index 0000000000..8f55f67cde --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/Node.java @@ -0,0 +1,289 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5.internal; + +import org.apache.hc.core5.http.HttpHost; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; + +/** + * Metadata about an {@link HttpHost} running OpenSearch. + */ +public class Node { + /** + * Address that this host claims is its primary contact point. + */ + private final HttpHost host; + /** + * Addresses on which the host is listening. These are useful to have + * around because they allow you to find a host based on any address it + * is listening on. + */ + private final Set boundHosts; + /** + * Name of the node as configured by the {@code node.name} attribute. + */ + private final String name; + /** + * Version of OpenSearch that the node is running or {@code null} + * if we don't know the version. + */ + private final String version; + /** + * Roles that the OpenSearch process on the host has or {@code null} + * if we don't know what roles the node has. + */ + private final Roles roles; + /** + * Attributes declared on the node. + */ + private final Map> attributes; + + /** + * Create a {@linkplain Node} with metadata. All parameters except + * {@code host} are nullable and implementations of {@link NodeSelector} + * need to decide what to do in their absence. + * + * @param host primary host address + * @param boundHosts addresses on which the host is listening + * @param name name of the node + * @param version version of OpenSearch + * @param roles roles that the OpenSearch process has on the host + * @param attributes attributes declared on the node + */ + public Node(HttpHost host, Set boundHosts, String name, String version, Roles roles, Map> attributes) { + if (host == null) { + throw new IllegalArgumentException("host cannot be null"); + } + this.host = host; + this.boundHosts = boundHosts; + this.name = name; + this.version = version; + this.roles = roles; + this.attributes = attributes; + } + + /** + * Create a {@linkplain Node} without any metadata. + * + * @param host primary host address + */ + public Node(HttpHost host) { + this(host, null, null, null, null, null); + } + + /** + * Contact information for the host. + */ + public HttpHost getHost() { + return host; + } + + /** + * Addresses on which the host is listening. These are useful to have + * around because they allow you to find a host based on any address it + * is listening on. + */ + public Set getBoundHosts() { + return boundHosts; + } + + /** + * The {@code node.name} of the node. + */ + public String getName() { + return name; + } + + /** + * Version of OpenSearch that the node is running or {@code null} + * if we don't know the version. + */ + public String getVersion() { + return version; + } + + /** + * Roles that the OpenSearch process on the host has or {@code null} + * if we don't know what roles the node has. + */ + public Roles getRoles() { + return roles; + } + + /** + * Attributes declared on the node. + */ + public Map> getAttributes() { + return attributes; + } + + /** + * Convert node to string representation + */ + @Override + public String toString() { + StringBuilder b = new StringBuilder(); + b.append("[host=").append(host); + if (boundHosts != null) { + b.append(", bound=").append(boundHosts); + } + if (name != null) { + b.append(", name=").append(name); + } + if (version != null) { + b.append(", version=").append(version); + } + if (roles != null) { + b.append(", roles=").append(roles); + } + if (attributes != null) { + b.append(", attributes=").append(attributes); + } + return b.append(']').toString(); + } + + /** + * Compare two nodes for equality + * @param obj node instance to compare with + */ + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != getClass()) { + return false; + } + Node other = (Node) obj; + return host.equals(other.host) + && Objects.equals(boundHosts, other.boundHosts) + && Objects.equals(name, other.name) + && Objects.equals(version, other.version) + && Objects.equals(roles, other.roles) + && Objects.equals(attributes, other.attributes); + } + + /** + * Calculate the hash code of the node + */ + @Override + public int hashCode() { + return Objects.hash(host, boundHosts, name, version, roles, attributes); + } + + /** + * Role information about an OpenSearch process. + */ + public static final class Roles { + + private final Set roles; + + /** + * Create a {@link Roles} instance of the given string set. + * + * @param roles set of role names. + */ + public Roles(final Set roles) { + this.roles = new TreeSet<>(roles); + } + + /** + * Returns whether or not the node could be elected cluster-manager. + */ + public boolean isClusterManagerEligible() { + return roles.contains("master") || roles.contains("cluster_manager"); + } + + /** + * Returns whether or not the node could be elected cluster-manager. + * @deprecated As of 2.2, because supporting inclusive language, replaced by {@link #isClusterManagerEligible()} + */ + @Deprecated + public boolean isMasterEligible() { + return isClusterManagerEligible(); + } + + /** + * Returns whether or not the node stores data. + */ + public boolean isData() { + return roles.contains("data"); + } + + /** + * Returns whether or not the node runs ingest pipelines. + */ + public boolean isIngest() { + return roles.contains("ingest"); + } + + /** + * Returns whether the node is dedicated to provide search capability. + */ + public boolean isSearch() { + return roles.contains("search"); + } + + /** + * Convert roles to string representation + */ + @Override + public String toString() { + return String.join(",", roles); + } + + /** + * Compare two roles for equality + * @param obj roles instance to compare with + */ + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != getClass()) { + return false; + } + Roles other = (Roles) obj; + return roles.equals(other.roles); + } + + /** + * Calculate the hash code of the roles + */ + @Override + public int hashCode() { + return roles.hashCode(); + } + + } +} diff --git a/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/NodeSelector.java b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/NodeSelector.java new file mode 100644 index 0000000000..eb11d8bb0b --- /dev/null +++ b/java-client/src/main/java/org/opensearch/client/transport/httpclient5/internal/NodeSelector.java @@ -0,0 +1,105 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.client.transport.httpclient5.internal; + +import java.util.Iterator; + +/** + * Selects nodes that can receive requests. Used to keep requests away + * from cluster-manager nodes or to send them to nodes with a particular attribute. + */ +public interface NodeSelector { + /** + * Select the {@link Node}s to which to send requests. This is called with + * a mutable {@link Iterable} of {@linkplain Node}s in the order that the + * rest client would prefer to use them and implementers should remove + * nodes from the that should not receive the request. Implementers may + * iterate the nodes as many times as they need. + *

+ * This may be called twice per request: first for "living" nodes that + * have not been denylisted by previous errors. If the selector removes + * all nodes from the list or if there aren't any living nodes then the + * {@link org.opensearch.client.transport.httpclient5.ApacheHttpClient5Transport} + * will call this method with a list of "dead" nodes. + *

+ * Implementers should not rely on the ordering of the nodes. + * + * @param nodes the {@link Node}s targeted for the sending requests + */ + void select(Iterable nodes); + /* + * We were fairly careful with our choice of Iterable here. The caller has + * a List but reordering the list is likely to break round robin. Luckily + * Iterable doesn't allow any reordering. + */ + + /** + * Selector that matches any node. + */ + NodeSelector ANY = new NodeSelector() { + @Override + public void select(Iterable nodes) { + // Intentionally does nothing + } + + @Override + public String toString() { + return "ANY"; + } + }; + + /** + * Selector that matches any node that has metadata and doesn't + * have the {@code cluster_manager} role OR it has the data {@code data} + * role. + */ + NodeSelector SKIP_DEDICATED_CLUSTER_MANAGERS = new NodeSelector() { + @Override + public void select(Iterable nodes) { + for (Iterator itr = nodes.iterator(); itr.hasNext();) { + Node node = itr.next(); + if (node.getRoles() == null) continue; + if (node.getRoles().isClusterManagerEligible() + && false == node.getRoles().isData() + && false == node.getRoles().isIngest()) { + itr.remove(); + } + } + } + + @Override + public String toString() { + return "SKIP_DEDICATED_CLUSTER_MANAGERS"; + } + }; +}