Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Refactor two-client usage. #386

Merged
merged 1 commit into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.utils.SdkAutoCloseable;

import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
Expand Down Expand Up @@ -70,154 +71,103 @@ public class AwsSdk2Transport implements OpenSearchTransport {
public static final Integer DEFAULT_REQUEST_COMPRESSION_SIZE = 8192;

private static final byte[] NO_BYTES = new byte[0];
private final SdkHttpClient httpClient;
private final SdkAsyncHttpClient asyncHttpClient;
private final SdkAutoCloseable httpClient;
private final String host;
private final String signingServiceName;
private final Region signingRegion;
private final JsonpMapper defaultMapper;
private final AwsSdk2TransportOptions transportOptions;

/**
* Create an {@link OpenSearchTransport} with a synchronous AWS HTTP client.
* Create an {@link OpenSearchTransport} with an asynchronous AWS HTTP client.
* <p>
* Note that asynchronous OpenSearch requests sent through this transport will be dispatched
* *synchronously* on the calling thread.
*
* @param httpClient HTTP client to use for OpenSearch requests.
* @param asyncHttpClient Asynchronous HTTP client to use for OpenSearch requests.
* @param host The fully qualified domain name to connect to.
* @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`.
* @param options Options that apply to all requests. Can be null. Create with
* {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials,
* compression options, etc.
*/
public AwsSdk2Transport(
@Nonnull SdkHttpClient httpClient,
@CheckForNull SdkAsyncHttpClient asyncHttpClient,
@Nonnull String host,
@Nonnull Region signingRegion,
@CheckForNull AwsSdk2TransportOptions options) {
this(httpClient, null, host, "es", signingRegion, options);
this(asyncHttpClient, host, "es", signingRegion, options);
}

/**
* Create an {@link OpenSearchTransport} with a synchronous AWS HTTP client.
* <p>
* Note that asynchronous OpenSearch requests sent through this transport will be dispatched
* *synchronously* on the calling thread.
*
* @param httpClient HTTP client to use for OpenSearch requests.
* @param syncHttpClient Synchronous HTTP client to use for OpenSearch requests.
* @param host The fully qualified domain name to connect to.
* @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless).
* @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`.
* @param options Options that apply to all requests. Can be null. Create with
* {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials,
* compression options, etc.
*/
public AwsSdk2Transport(
@Nonnull SdkHttpClient httpClient,
@CheckForNull SdkHttpClient syncHttpClient,
@Nonnull String host,
@Nonnull String signingServiceName,
@Nonnull Region signingRegion,
@CheckForNull AwsSdk2TransportOptions options) {
this(httpClient, null, host, signingServiceName, signingRegion, options);
}

/**
* Create an {@link OpenSearchTransport} with an asynchronous AWS HTTP client
* <p>
* Note that synchronous OpenSearch requests sent through this transport will be dispatched
* using the asynchronous client, but the calling thread will block until they are complete.
*
* @param asyncHttpClient HTTP client to use for OpenSearch requests.
* @param host The target host.
* @param signingRegion The AWS region for which requests will be signed. This should typically match region in `host`.
* @param options Options that apply to all requests. Can be null. Create with
* {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials,
* compression options, etc.
*/
public AwsSdk2Transport(
@Nonnull SdkAsyncHttpClient asyncHttpClient,
@Nonnull String host,
@Nonnull Region signingRegion,
@CheckForNull AwsSdk2TransportOptions options) {
this(null, asyncHttpClient, host, "es", signingRegion, options);
this(syncHttpClient, host, "es", signingRegion, options);
}

/**
* Create an {@link OpenSearchTransport} with an asynchronous AWS HTTP client.
* <p>
* Note that synchronous OpenSearch requests sent through this transport will be dispatched
* using the asynchronous client, but the calling thread will block until they are complete.
* Note that asynchronous OpenSearch requests sent through this transport will be dispatched
* *synchronously* on the calling thread.
*
* @param asyncHttpClient HTTP client to use for OpenSearch requests.
* @param host The target host.
* @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless).
* @param asyncHttpClient Asynchronous HTTP client to use for OpenSearch requests.
* @param host The fully qualified domain name to connect to.
* @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`.
* @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless).
* @param options Options that apply to all requests. Can be null. Create with
* {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials,
* compression options, etc.
*/
public AwsSdk2Transport(
@Nonnull SdkAsyncHttpClient asyncHttpClient,
@CheckForNull SdkAsyncHttpClient asyncHttpClient,
@Nonnull String host,
@Nonnull String signingServiceName,
@Nonnull Region signingRegion,
@CheckForNull AwsSdk2TransportOptions options) {
this(null, asyncHttpClient, host, signingServiceName, signingRegion, options);
this((SdkAutoCloseable) asyncHttpClient, host, signingServiceName, signingRegion, options);
}

/**
* Create an {@link OpenSearchTransport} with both synchronous and asynchronous AWS HTTP clients.
* <p>
* The synchronous client will be used for synchronous OpenSearch requests, and the asynchronous client
* will be used for asynchronous HTTP requests.
* Create an {@link OpenSearchTransport} with a synchronous AWS HTTP client.
*
* @param httpClient HTTP client to use for OpenSearch requests.
* @param asyncHttpClient HTTP client to use for synchronous OpenSearch requests.
* @param syncHttpClient Synchronous HTTP client to use for OpenSearch requests.
* @param host The fully qualified domain name to connect to.
* @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`.
* @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless).
* @param options Options that apply to all requests. Can be null. Create with
* {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials,
* compression options, etc.
*/
public AwsSdk2Transport(
@CheckForNull SdkHttpClient httpClient,
@CheckForNull SdkAsyncHttpClient asyncHttpClient,
@CheckForNull SdkHttpClient syncHttpClient,
@Nonnull String host,
@Nonnull String signingServiceName,
@Nonnull Region signingRegion,
@CheckForNull AwsSdk2TransportOptions options) {
this(httpClient, asyncHttpClient, host, "es", signingRegion, options);
this((SdkAutoCloseable) syncHttpClient, host, signingServiceName, signingRegion, options);
}

/**
* Create an {@link OpenSearchTransport} with both synchronous and asynchronous AWS HTTP clients.
* <p>
* The synchronous client will be used for synchronous OpenSearch requests, and the asynchronous client
* will be used for asynchronous HTTP requests.
*
* @param httpClient HTTP client to use for OpenSearch requests.
* @param asyncHttpClient HTTP client to use for synchronous OpenSearch requests.
* @param host The fully qualified domain name to connect to.
* @param signingRegion The AWS region for which requests will be signed. This should typically match the region in `host`.
* @param signingServiceName The AWS signing service name, one of `es` (Amazon OpenSearch) or `aoss` (Amazon OpenSearch Serverless).
* @param options Options that apply to all requests. Can be null. Create with
* {@link AwsSdk2TransportOptions#builder()} and use these to specify non-default credentials,
* compression options, etc.
*/
public AwsSdk2Transport(
@CheckForNull SdkHttpClient httpClient,
@CheckForNull SdkAsyncHttpClient asyncHttpClient,
private AwsSdk2Transport(
@CheckForNull SdkAutoCloseable httpClient,
@Nonnull String host,
@Nonnull String signingServiceName,
@Nonnull Region signingRegion,
@CheckForNull AwsSdk2TransportOptions options) {
if (httpClient == null && asyncHttpClient == null)
{
throw new IllegalArgumentException("At least one SdkHttpClient or SdkAsyncHttpClient must be provided");
}
Objects.requireNonNull(host, "Target OpenSearch service host must not be null");
this.httpClient = httpClient;
this.asyncHttpClient = asyncHttpClient;
this.host = host;
this.signingServiceName = signingServiceName;
this.signingRegion = signingRegion;
Expand All @@ -237,11 +187,11 @@ public <RequestT, ResponseT, ErrorT> ResponseT performRequest(
OpenSearchRequestBodyBuffer requestBody = prepareRequestBody(request, endpoint, options);
SdkHttpFullRequest clientReq = prepareRequest(request, endpoint, options, requestBody);

if (httpClient != null) {
return executeSync(clientReq, endpoint, options);
} else {
if (httpClient instanceof SdkHttpClient) {
return executeSync((SdkHttpClient) httpClient, clientReq, endpoint, options);
} else if (httpClient instanceof SdkAsyncHttpClient) {
try {
return executeAsync(clientReq, requestBody, endpoint, options).get();
return executeAsync((SdkAsyncHttpClient) httpClient, clientReq, requestBody, endpoint, options).get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause != null) {
Expand All @@ -257,6 +207,8 @@ public <RequestT, ResponseT, ErrorT> ResponseT performRequest(
} catch (InterruptedException e) {
throw new IOException("HttpRequest was interrupted", e);
}
} else {
throw new IOException("invalid httpClient: " + httpClient);
}
}

Expand All @@ -269,11 +221,13 @@ public <RequestT, ResponseT, ErrorT> CompletableFuture<ResponseT> performRequest
try {
OpenSearchRequestBodyBuffer requestBody = prepareRequestBody(request, endpoint, options);
SdkHttpFullRequest clientReq = prepareRequest(request, endpoint, options, requestBody);
if (asyncHttpClient != null) {
return executeAsync(clientReq, requestBody, endpoint, options);
} else {
ResponseT result = executeSync(clientReq, endpoint, options);
if (httpClient instanceof SdkAsyncHttpClient) {
return executeAsync((SdkAsyncHttpClient) httpClient, clientReq, requestBody, endpoint, options);
} else if (httpClient instanceof SdkHttpClient) {
ResponseT result = executeSync((SdkHttpClient) httpClient, clientReq, endpoint, options);
return CompletableFuture.completedFuture(result);
} else {
throw new IOException("invalid httpClient: " + httpClient);
}
} catch (Throwable e) {
CompletableFuture<ResponseT> cf = new CompletableFuture<>();
Expand Down Expand Up @@ -421,6 +375,7 @@ private void applyOptionsHeaders(SdkHttpFullRequest.Builder builder, TransportOp
}

private <ResponseT> ResponseT executeSync(
SdkHttpClient syncHttpClient,
SdkHttpFullRequest httpRequest,
Endpoint<?, ResponseT, ?> endpoint,
TransportOptions options
Expand All @@ -430,7 +385,7 @@ private <ResponseT> ResponseT executeSync(
if (httpRequest.contentStreamProvider().isPresent()) {
executeRequest.contentStreamProvider(httpRequest.contentStreamProvider().get());
}
HttpExecuteResponse executeResponse = httpClient.prepareRequest(executeRequest.build()).call();
HttpExecuteResponse executeResponse = syncHttpClient.prepareRequest(executeRequest.build()).call();
AbortableInputStream bodyStream = null;
try {
bodyStream = executeResponse.responseBody().orElse(null);
Expand All @@ -444,6 +399,7 @@ private <ResponseT> ResponseT executeSync(
}

private <ResponseT> CompletableFuture<ResponseT> executeAsync(
SdkAsyncHttpClient asyncHttpClient,
SdkHttpFullRequest httpRequest,
@CheckForNull OpenSearchRequestBodyBuffer requestBody,
Endpoint<?, ResponseT, ?> endpoint,
Expand Down Expand Up @@ -546,7 +502,6 @@ private <ResponseT, ErrorT> ResponseT parseResponse(
ResponseT response = (ResponseT) new BooleanResponse(bep.getResult(statusCode));
return response;
} else if (endpoint instanceof JsonEndpoint) {
@SuppressWarnings("unchecked")
JsonEndpoint<?, ResponseT, ?> jsonEndpoint = (JsonEndpoint<?, ResponseT, ?>) endpoint;
// Successful response
ResponseT response = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

package org.opensearch.client.opensearch.integTest.aws;


import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -42,7 +41,6 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;


public abstract class AwsSdk2TransportTestCase {
public static final String TEST_INDEX = "opensearch-java-integtest";

Expand Down Expand Up @@ -85,16 +83,14 @@ protected OpenSearchClient getClient(
getTestClusterHost(),
getTestClusterServiceName(),
getTestClusterRegion(),
getTransportOptions().build()
);
getTransportOptions().build());
} else {
transport = new AwsSdk2Transport(
getHttpClient(),
getTestClusterHost(),
getTestClusterServiceName(),
getTestClusterRegion(),
getTransportOptions().build()
);
getTransportOptions().build());
}
return new OpenSearchClient(transport);
}
Expand All @@ -111,16 +107,14 @@ protected OpenSearchAsyncClient getAsyncClient(
getTestClusterHost(),
getTestClusterServiceName(),
getTestClusterRegion(),
getTransportOptions().build()
);
getTransportOptions().build());
} else {
transport = new AwsSdk2Transport(
getHttpClient(),
getTestClusterHost(),
getTestClusterServiceName(),
getTestClusterRegion(),
getTransportOptions().build()
);
getTransportOptions().build());
}
return new OpenSearchAsyncClient(transport);
}
Expand All @@ -137,16 +131,14 @@ protected OpenSearchIndicesClient getIndexesClient(
getTestClusterHost(),
getTestClusterServiceName(),
getTestClusterRegion(),
getTransportOptions().build()
);
getTransportOptions().build());
} else {
transport = new AwsSdk2Transport(
getHttpClient(),
getTestClusterHost(),
getTestClusterServiceName(),
getTestClusterRegion(),
getTransportOptions().build()
);
getTransportOptions().build());
}
return new OpenSearchIndicesClient(transport);
}
Expand All @@ -171,13 +163,15 @@ public static void cleanupClients() throws IOException {
if (httpClient != null) {
try {
httpClient.close();
httpClient = null;
} catch (Throwable e) {
// Not our problem
}
}
if (asyncHttpClient != null) {
try {
asyncHttpClient.close();
asyncHttpClient = null;
} catch (Throwable e) {
// Not our problem
}
Expand All @@ -204,10 +198,8 @@ public void resetTestIndex(boolean async) throws Exception {
IndexState indexInfo = client.get(b -> b.index(TEST_INDEX)).get(TEST_INDEX);
if (indexInfo != null) {
indexExists = true;

}
} catch (
OpenSearchException e) {
} catch (OpenSearchException e) {
if (e.status() != 404) {
throw e;
}
Expand Down Expand Up @@ -237,17 +229,14 @@ protected SearchResponse<SimplePojo> query(OpenSearchClient client, String title
.ignoreThrottled(false)
.sort(
new SortOptions.Builder().score(o -> o.order(SortOrder.Desc)).build(),
new SortOptions.Builder().doc(o -> o.order(SortOrder.Desc)).build()
)
new SortOptions.Builder().doc(o -> o.order(SortOrder.Desc)).build())
.query(query);


return client.search(req.build(), SimplePojo.class);
}

protected CompletableFuture<SearchResponse<SimplePojo>> query(
OpenSearchAsyncClient client, String title, String text
) {
OpenSearchAsyncClient client, String title, String text) {
var query = Query.of(qb -> {
if (title != null) {
qb.match(mb -> mb.field("title").query(vb -> vb.stringValue(title)));
Expand All @@ -264,8 +253,7 @@ protected CompletableFuture<SearchResponse<SimplePojo>> query(
.ignoreThrottled(false)
.sort(
new SortOptions.Builder().score(o -> o.order(SortOrder.Desc)).build(),
new SortOptions.Builder().doc(o -> o.order(SortOrder.Desc)).build()
)
new SortOptions.Builder().doc(o -> o.order(SortOrder.Desc)).build())
.query(query);

try {
Expand Down