Skip to content

Commit

Permalink
Address comments to simplify credential creation
Browse files Browse the repository at this point in the history
Signed-off-by: Aindriu Lavelle <[email protected]>
  • Loading branch information
aindriu-aiven committed Oct 10, 2024
1 parent 2df807f commit 21d5761
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 95 deletions.
16 changes: 16 additions & 0 deletions s3-connectors-common/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins { id("aiven-apache-kafka-connectors-all.java-conventions") }

val amazonS3Version by extra("1.12.729")
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

package io.aiven.kafka.connect.iam;

import java.util.Objects;

import io.aiven.kafka.connect.s3.S3BaseConfig;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;

Expand All @@ -32,12 +33,11 @@ public AWSCredentialsProvider getProvider(final S3BaseConfig config) {
if (config.hasAwsStsRole()) {
return getStsProvider(config);
}
final AwsAccessSecret awsCredentials = config.getAwsCredentials();
if (!awsCredentials.isValid()) {
final BasicAWSCredentials awsCredentials = config.getAwsCredentials();
if (Objects.isNull(awsCredentials)) {
return config.getCustomCredentialsProvider();
}
return new AWSStaticCredentialsProvider(new BasicAWSCredentials(awsCredentials.getAccessKeyId().value(),
awsCredentials.getSecretAccessKey().value()));
return new AWSStaticCredentialsProvider(awsCredentials);
}

private AWSCredentialsProvider getStsProvider(final S3BaseConfig config) {
Expand All @@ -52,11 +52,8 @@ private AWSCredentialsProvider getStsProvider(final S3BaseConfig config) {

private AWSSecurityTokenService securityTokenService(final S3BaseConfig config) {
if (config.hasStsEndpointConfig()) {
final AwsStsEndpointConfig endpointConfig = config.getStsEndpointConfig();
final AwsClientBuilder.EndpointConfiguration stsConfig = new AwsClientBuilder.EndpointConfiguration(
endpointConfig.getServiceEndpoint(), endpointConfig.getSigningRegion());
final AWSSecurityTokenServiceClientBuilder stsBuilder = AWSSecurityTokenServiceClientBuilder.standard();
stsBuilder.setEndpointConfiguration(stsConfig);
stsBuilder.setEndpointConfiguration(config.getAwsEndpointConfiguration());
return stsBuilder.build();
}
return AWSSecurityTokenServiceClientBuilder.defaultClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@
import io.aiven.kafka.connect.common.config.validators.NonEmptyPassword;
import io.aiven.kafka.connect.common.config.validators.OutputFieldsValidator;
import io.aiven.kafka.connect.common.config.validators.UrlValidator;
import io.aiven.kafka.connect.iam.AwsAccessSecret;
import io.aiven.kafka.connect.iam.AwsStsEndpointConfig;
import io.aiven.kafka.connect.iam.AwsStsRole;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.regions.Regions;
Expand Down Expand Up @@ -125,11 +126,11 @@ public class S3BaseConfig extends AbstractConfig {
// in other words we can't use values greater than 30
public static final int S3_RETRY_BACKOFF_MAX_RETRIES_DEFAULT = 3;

protected S3BaseConfig(ConfigDef definition, Map<?, ?> originals) { // NOPMD UnusedAssignment
super(definition, originals);
protected S3BaseConfig(ConfigDef definition, Map<String, String> originals) { // NOPMD UnusedAssignment
super(definition, handleDeprecatedYyyyUppercase(originals));
}

protected static Map<String, String> handleDeprecatedYyyyUppercase(final Map<String, String> properties) {
private static Map<String, String> handleDeprecatedYyyyUppercase(final Map<String, String> properties) {
if (!properties.containsKey(AWS_S3_PREFIX_CONFIG)) {
return properties;
}
Expand Down Expand Up @@ -216,34 +217,6 @@ protected static void addAwsConfigGroup(final ConfigDef configDef) {
ConfigDef.Width.NONE, AWS_S3_REGION_CONFIG);
}

protected static void addS3SinkConfig(final ConfigDef configDef) {
int awsS3SinkCounter = 0;

configDef.define(AWS_S3_PART_SIZE, Type.INT, DEFAULT_PART_SIZE, new ConfigDef.Validator() {

static final int MAX_BUFFER_SIZE = 2_000_000_000;

@Override
public void ensureValid(final String name, final Object value) {
if (value == null) {
throw new ConfigException(name, null, "Part size must be non-null");
}
final var number = (Number) value;
if (number.longValue() <= 0) {
throw new ConfigException(name, value, "Part size must be greater than 0");
}
if (number.longValue() > MAX_BUFFER_SIZE) {
throw new ConfigException(name, value,
"Part size must be no more: " + MAX_BUFFER_SIZE + " bytes (2GB)");
}
}
}, Importance.MEDIUM,
"The Part Size in S3 Multi-part Uploads in bytes. Maximum is " + Integer.MAX_VALUE
+ " (2GB) and default is " + DEFAULT_PART_SIZE + " (5MB)",
GROUP_AWS, awsS3SinkCounter++, // NOPMD UnusedAssignment
ConfigDef.Width.NONE, AWS_S3_PART_SIZE);
}

protected static void addAwsStsConfigGroup(final ConfigDef configDef) {
int awsStsGroupCounter = 0;
configDef.define(AWS_STS_ROLE_ARN, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(),
Expand Down Expand Up @@ -395,12 +368,22 @@ public AwsStsEndpointConfig getStsEndpointConfig() {
return new AwsStsEndpointConfig(getString(AWS_STS_CONFIG_ENDPOINT), getString(AWS_S3_REGION_CONFIG));
}

public AwsAccessSecret getNewAwsCredentials() {
return new AwsAccessSecret(getPassword(AWS_ACCESS_KEY_ID_CONFIG), getPassword(AWS_SECRET_ACCESS_KEY_CONFIG));
public AwsClientBuilder.EndpointConfiguration getAwsEndpointConfiguration() {
final AwsStsEndpointConfig config = getStsEndpointConfig();
return new AwsClientBuilder.EndpointConfiguration(config.getServiceEndpoint(), config.getSigningRegion());
}

public AwsAccessSecret getAwsCredentials() {
return getNewAwsCredentials().isValid() ? getNewAwsCredentials() : getOldAwsCredentials();
public BasicAWSCredentials getAwsCredentials() {
if (Objects.nonNull(getPassword(AWS_ACCESS_KEY_ID_CONFIG))
&& Objects.nonNull(getPassword(AWS_SECRET_ACCESS_KEY_CONFIG))) {
return new BasicAWSCredentials(getPassword(AWS_ACCESS_KEY_ID_CONFIG).value(),
getPassword(AWS_SECRET_ACCESS_KEY_CONFIG).value());
} else if (Objects.nonNull(getPassword(AWS_ACCESS_KEY_ID))
&& Objects.nonNull(getPassword(AWS_SECRET_ACCESS_KEY))) {
return new BasicAWSCredentials(getPassword(AWS_ACCESS_KEY_ID).value(),
getPassword(AWS_SECRET_ACCESS_KEY).value());
}
return null;
}

public String getAwsS3EndPoint() {
Expand Down Expand Up @@ -453,11 +436,8 @@ public int getS3RetryBackoffMaxRetries() {
return getInt(AWS_S3_RETRY_BACKOFF_MAX_RETRIES_CONFIG);
}

public AwsAccessSecret getOldAwsCredentials() {
return new AwsAccessSecret(getPassword(AWS_ACCESS_KEY_ID), getPassword(AWS_SECRET_ACCESS_KEY));
}

public AWSCredentialsProvider getCustomCredentialsProvider() {
return getConfiguredInstance(AWS_CREDENTIALS_PROVIDER_CONFIG, AWSCredentialsProvider.class);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;

public class S3ClientUtils {
public class S3Utility {

private final AwsCredentialProviderFactory credentialFactory = new AwsCredentialProviderFactory();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@

public class AwsCredentialConfig extends S3BaseConfig {
public AwsCredentialConfig(final Map<String, String> properties) {
super(configDef(new ConfigDef()), handleDeprecatedYyyyUppercase(properties));
super(configDef(new ConfigDef()), properties);
}

public static ConfigDef configDef(ConfigDef configDef) { // NOPMD UnusedAssignment
addS3RetryPolicies(configDef);
addAwsConfigGroup(configDef);
addAwsStsConfigGroup(configDef);
addDeprecatedConfiguration(configDef);
addS3SinkConfig(configDef);
return configDef;
}
}

0 comments on commit 21d5761

Please sign in to comment.