Skip to content

Commit

Permalink
[adhoc] Prepare aws2 ClientConfiguration for json serialization and c…
Browse files Browse the repository at this point in the history
…leanup AWS Module (apache#16894)

* [adhoc] Prepare aws2 ClientConfiguration and related classes for json serialization and cleanup AWS Module
  • Loading branch information
Moritz Mack authored and nancyxu123 committed Mar 9, 2022
1 parent 62e0c81 commit 8478a0b
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 167 deletions.
1 change: 1 addition & 0 deletions sdks/java/io/amazon-web-services2/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ ext.summary = "IO library to read and write Amazon Web Services services from Be

dependencies {
implementation library.java.vendored_guava_26_0_jre
implementation library.java.error_prone_annotations
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.aws_java_sdk2_apache_client
implementation library.java.aws_java_sdk2_netty_client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,21 @@

import static org.apache.beam.sdk.io.aws2.options.AwsSerializableUtils.deserializeAwsCredentialsProvider;
import static org.apache.beam.sdk.io.aws2.options.AwsSerializableUtils.serializeAwsCredentialsProvider;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import com.google.auto.value.extension.memoized.Memoized;
import java.io.Serializable;
import java.net.URI;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.aws2.options.AwsOptions;
import org.checkerframework.dataflow.qual.Pure;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;

Expand All @@ -47,18 +49,28 @@
* uses a backoff strategy with equal jitter for computing the delay before the next retry.
*/
@AutoValue
@JsonInclude(value = JsonInclude.Include.NON_EMPTY)
@JsonDeserialize(builder = ClientConfiguration.Builder.class)
public abstract class ClientConfiguration implements Serializable {

/**
* Optional {@link AwsCredentialsProvider}. If set, this overwrites the default in {@link
* AwsOptions#getAwsCredentialsProvider()}.
*/
public abstract @Nullable @Pure AwsCredentialsProvider credentialsProvider();
@JsonProperty
@Memoized
public @Nullable @Pure AwsCredentialsProvider credentialsProvider() {
return credentialsProviderAsJson() != null
? deserializeAwsCredentialsProvider(credentialsProviderAsJson())
: null;
}

/**
* Optional {@link Region}. If set, this overwrites the default in {@link
* AwsOptions#getAwsRegion()}.
*/
@JsonProperty
@Memoized
public @Nullable @Pure Region region() {
return regionId() != null ? Region.of(regionId()) : null;
}
Expand All @@ -67,20 +79,24 @@ public abstract class ClientConfiguration implements Serializable {
* Optional service endpoint to use AWS compatible services instead, e.g. for testing. If set,
* this overwrites the default in {@link AwsOptions#getEndpoint()}.
*/
@JsonProperty
public abstract @Nullable @Pure URI endpoint();

/**
* Optional {@link RetryConfiguration} for AWS clients. If unset, retry behavior will be unchanged
* and use SDK defaults.
*/
@JsonProperty
public abstract @Nullable @Pure RetryConfiguration retry();

abstract @Nullable @Pure String regionId();

abstract @Nullable @Pure String credentialsProviderAsJson();

public abstract Builder toBuilder();

public static Builder builder() {
return new AutoValue_ClientConfiguration.Builder();
return Builder.builder();
}

public static ClientConfiguration create(
Expand All @@ -93,12 +109,20 @@ public static ClientConfiguration create(
}

@AutoValue.Builder
@JsonPOJOBuilder(withPrefix = "")
public abstract static class Builder {
@JsonCreator
static Builder builder() {
return new AutoValue_ClientConfiguration.Builder();
}

/**
* Optional {@link AwsCredentialsProvider}. If set, this overwrites the default in {@link
* AwsOptions#getAwsCredentialsProvider()}.
*/
public abstract Builder credentialsProvider(AwsCredentialsProvider credentialsProvider);
public Builder credentialsProvider(AwsCredentialsProvider credentialsProvider) {
return credentialsProviderAsJson(serializeAwsCredentialsProvider(credentialsProvider));
}

/**
* Optional {@link Region}. If set, this overwrites the default in {@link
Expand All @@ -118,6 +142,7 @@ public Builder region(Region region) {
* Optional {@link RetryConfiguration} for AWS clients. If unset, retry behavior will be
* unchanged and use SDK defaults.
*/
@JsonSetter
public abstract Builder retry(RetryConfiguration retry);

/**
Expand All @@ -132,58 +157,8 @@ public Builder retry(Consumer<RetryConfiguration.Builder> retry) {

abstract Builder regionId(String region);

abstract AwsCredentialsProvider credentialsProvider();

abstract ClientConfiguration autoBuild();
abstract Builder credentialsProviderAsJson(String credentialsProvider);

public ClientConfiguration build() {
if (credentialsProvider() != null) {
credentialsProvider(new SerializableAwsCredentialsProvider(credentialsProvider()));
}
return autoBuild();
}
}

/** Internal serializable {@link AwsCredentialsProvider}. */
private static class SerializableAwsCredentialsProvider
implements AwsCredentialsProvider, Serializable {
private transient AwsCredentialsProvider provider;
private String serializedProvider;

SerializableAwsCredentialsProvider(AwsCredentialsProvider provider) {
this.provider = checkNotNull(provider, "AwsCredentialsProvider cannot be null");
this.serializedProvider = serializeAwsCredentialsProvider(provider);
}

private void writeObject(ObjectOutputStream out) throws IOException {
out.writeUTF(serializedProvider);
}

private void readObject(ObjectInputStream in) throws IOException {
serializedProvider = in.readUTF();
provider = deserializeAwsCredentialsProvider(serializedProvider);
}

@Override
public AwsCredentials resolveCredentials() {
return provider.resolveCredentials();
}

@Override
public boolean equals(@Nullable Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SerializableAwsCredentialsProvider that = (SerializableAwsCredentialsProvider) o;
return serializedProvider.equals(that.serializedProvider);
}

@Override
public int hashCode() {
return serializedProvider.hashCode();
}
public abstract ClientConfiguration build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
*/
package org.apache.beam.sdk.io.aws2.common;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import javax.annotation.Nullable;
Expand All @@ -32,11 +37,14 @@
* HTTP Configuration</a>
*/
@AutoValue
@JsonInclude(value = JsonInclude.Include.NON_EMPTY)
@JsonDeserialize(builder = HttpClientConfiguration.Builder.class)
public abstract class HttpClientConfiguration implements Serializable {

/**
* Milliseconds to wait when acquiring a connection from the pool before giving up and timing out.
*/
@JsonProperty
public abstract @Nullable @Pure Integer connectionAcquisitionTimeout();

/**
Expand All @@ -45,12 +53,14 @@ public abstract class HttpClientConfiguration implements Serializable {
* <p>This will never close a connection that is currently in use, so long-lived connections may
* remain open longer than this time.
*/
@JsonProperty
public abstract @Nullable @Pure Integer connectionMaxIdleTime();

/**
* Milliseconds to wait when initially establishing a connection before giving up and timing out.
* A duration of 0 means infinity, and is not recommended.
*/
@JsonProperty
public abstract @Nullable @Pure Integer connectionTimeout();

/**
Expand All @@ -60,12 +70,14 @@ public abstract class HttpClientConfiguration implements Serializable {
* <p>This will never close a connection that is currently in use, so long-lived connections may
* remain open longer than this time.
*/
@JsonProperty
public abstract @Nullable @Pure Integer connectionTimeToLive();

/**
* Milliseconds to wait for data to be transferred over an established, open connection before the
* connection is timed out. A duration of 0 means infinity, and is not recommended.
*/
@JsonProperty
public abstract @Nullable @Pure Integer socketTimeout();

/**
Expand All @@ -75,6 +87,7 @@ public abstract class HttpClientConfiguration implements Serializable {
* <p>Note: Read timeout is only supported for async clients and ignored otherwise. Use {@link
* #socketTimeout()} instead.
*/
@JsonProperty
public abstract @Nullable @Pure Integer readTimeout();

/**
Expand All @@ -84,6 +97,7 @@ public abstract class HttpClientConfiguration implements Serializable {
* <p>Note: Write timeout is only supported for async clients and ignored otherwise. Use {@link
* #socketTimeout()} instead.
*/
@JsonProperty
public abstract @Nullable @Pure Integer writeTimeout();

/**
Expand All @@ -94,14 +108,21 @@ public abstract class HttpClientConfiguration implements Serializable {
* concurrent requests. When using HTTP/2 the number of connections that will be used depends on
* the max streams allowed per connection.
*/
@JsonProperty
public abstract @Nullable @Pure Integer maxConnections();

public static Builder builder() {
return new AutoValue_HttpClientConfiguration.Builder();
return Builder.builder();
}

@AutoValue.Builder
@JsonPOJOBuilder(withPrefix = "")
public abstract static class Builder {
@JsonCreator
static Builder builder() {
return new AutoValue_HttpClientConfiguration.Builder();
}

/**
* Milliseconds to wait when acquiring a connection from the pool before giving up and timing
* out.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,13 @@
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.joda.time.Duration.ZERO;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.fasterxml.jackson.databind.util.StdConverter;
import com.google.auto.value.AutoValue;
import java.io.Serializable;
import javax.annotation.Nullable;
Expand All @@ -36,31 +43,51 @@
* SdkDefaultRetrySetting} for further details.
*/
@AutoValue
@JsonInclude(value = JsonInclude.Include.NON_EMPTY)
@JsonDeserialize(builder = RetryConfiguration.Builder.class)
public abstract class RetryConfiguration implements Serializable {
private static final java.time.Duration BASE_BACKOFF = java.time.Duration.ofMillis(100);
private static final java.time.Duration THROTTLED_BASE_BACKOFF = java.time.Duration.ofSeconds(1);
private static final java.time.Duration MAX_BACKOFF = java.time.Duration.ofSeconds(20);

@JsonProperty
public abstract @Pure int numRetries();

@JsonProperty
@JsonSerialize(converter = DurationToMillis.class)
public abstract @Nullable @Pure Duration baseBackoff();

@JsonProperty
@JsonSerialize(converter = DurationToMillis.class)
public abstract @Nullable @Pure Duration throttledBaseBackoff();

@JsonProperty
@JsonSerialize(converter = DurationToMillis.class)
public abstract @Nullable @Pure Duration maxBackoff();

public abstract RetryConfiguration.Builder toBuilder();

public static Builder builder() {
return new AutoValue_RetryConfiguration.Builder();
return Builder.builder();
}

@AutoValue.Builder
@JsonPOJOBuilder(withPrefix = "")
public abstract static class Builder {
@JsonCreator
static Builder builder() {
return new AutoValue_RetryConfiguration.Builder();
}

public abstract Builder numRetries(int numRetries);

@JsonDeserialize(converter = MillisToDuration.class)
public abstract Builder baseBackoff(Duration baseBackoff);

@JsonDeserialize(converter = MillisToDuration.class)
public abstract Builder throttledBaseBackoff(Duration baseBackoff);

@JsonDeserialize(converter = MillisToDuration.class)
public abstract Builder maxBackoff(Duration maxBackoff);

abstract RetryConfiguration autoBuild();
Expand Down Expand Up @@ -115,4 +142,18 @@ RetryPolicy toClientRetryPolicy() {
private @Nullable static java.time.Duration toJava(@Nullable Duration duration) {
return duration == null ? null : java.time.Duration.ofMillis(duration.getMillis());
}

static class DurationToMillis extends StdConverter<Duration, Long> {
@Override
public Long convert(Duration duration) {
return duration.getMillis();
}
}

static class MillisToDuration extends StdConverter<Long, Duration> {
@Override
public Duration convert(Long millis) {
return Duration.millis(millis);
}
}
}
Loading

0 comments on commit 8478a0b

Please sign in to comment.