diff --git a/s3-connectors-common/.gitattributes b/s3-connectors-common/.gitattributes new file mode 100644 index 000000000..f91f64602 --- /dev/null +++ b/s3-connectors-common/.gitattributes @@ -0,0 +1,12 @@ +# +# https://help.github.com/articles/dealing-with-line-endings/ +# +# Linux start script should use lf +/gradlew text eol=lf + +# These are Windows script files and should use crlf +*.bat text eol=crlf + +# Binary files should be left untouched +*.jar binary + diff --git a/s3-connectors-common/.gitignore b/s3-connectors-common/.gitignore new file mode 100644 index 000000000..9ac64b3ff --- /dev/null +++ b/s3-connectors-common/.gitignore @@ -0,0 +1,12 @@ +target +docs/_build +.idea +*.iml +*.DS_Store +.gradle +.gradletasknamecache +build/ +rpm/ +rpmbuild/ +lib/ +*.sh diff --git a/s3-connectors-common/README.md b/s3-connectors-common/README.md new file mode 100644 index 000000000..6cc542848 --- /dev/null +++ b/s3-connectors-common/README.md @@ -0,0 +1,11 @@ +# Aiven's Common Module for S3 Connectors for Apache Kafka® + +Shared common functionality for S3 connectivity and configuration + +# License + +This project is licensed under the [Apache License, Version 2.0](../LICENSE). + +# Trademarks + +Apache Kafka, Apache Kafka Connect and Apache Maven are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries. \ No newline at end of file diff --git a/s3-connectors-common/build.gradle.kts b/s3-connectors-common/build.gradle.kts new file mode 100644 index 000000000..0bfd55294 --- /dev/null +++ b/s3-connectors-common/build.gradle.kts @@ -0,0 +1,172 @@ +plugins { id("aiven-apache-kafka-connectors-all.java-conventions") } + +val amazonS3Version by extra("1.12.729") +val amazonSTSVersion by extra("1.12.729") +val s3mockVersion by extra("0.2.6") + +dependencies { + implementation("com.amazonaws:aws-java-sdk-s3:$amazonS3Version") + implementation("com.amazonaws:aws-java-sdk-sts:$amazonSTSVersion") + + implementation(project(":commons")) + + compileOnly(apache.kafka.connect.api) + compileOnly(apache.kafka.connect.runtime) + compileOnly(apache.kafka.connect.json) + + implementation(confluent.kafka.connect.avro.data) { + exclude(group = "org.apache.kafka", module = "kafka-clients") + } + + implementation(tools.spotbugs.annotations) + implementation(compressionlibs.snappy) + implementation(compressionlibs.zstd.jni) + + implementation(logginglibs.slf4j) + + implementation(apache.commons.text) + + implementation(apache.parquet.avro) { + exclude(group = "org.xerial.snappy", module = "snappy-java") + exclude(group = "org.slf4j", module = "slf4j-api") + exclude(group = "org.apache.avro", module = "avro") + } + implementation(apache.hadoop.common) { + exclude(group = "org.apache.hadoop.thirdparty", module = "hadoop-shaded-protobuf_3_7") + exclude(group = "com.google.guava", module = "guava") + exclude(group = "commons-cli", module = "commons-cli") + exclude(group = "org.apache.commons", module = "commons-math3") + exclude(group = "org.apache.httpcomponents", module = "httpclient") + exclude(group = "commons-codec", module = "commons-codec") + exclude(group = "commons-io", module = "commons-io") + exclude(group = "commons-net", module = "commons-net") + exclude(group = "org.eclipse.jetty") + exclude(group = "org.eclipse.jetty.websocket") + exclude(group = "javax.servlet") + exclude(group = "javax.servlet.jsp") + exclude(group = "javax.activation") + exclude(group = "com.sun.jersey") + exclude(group = "log4j") + exclude(group = "org.apache.commons", module = "commons-text") + exclude(group = "org.slf4j", module = "slf4j-api") + exclude(group = "org.apache.hadoop", module = "hadoop-auth") + exclude(group = "org.apache.hadoop", module = "hadoop-yarn-api") + exclude(group = "com.google.re2j") + exclude(group = "com.google.protobuf") + exclude(group = "com.google.code.gson") + exclude(group = "com.jcraft") + exclude(group = "org.apache.curator") + exclude(group = "org.apache.zookeeper") + exclude(group = "org.apache.htrace") + exclude(group = "com.google.code.findbugs") + exclude(group = "org.apache.kerby") + exclude(group = "com.fasterxml.jackson.core") + exclude(group = "com.fasterxml.woodstox", module = "woodstox-core:5.0.3") + exclude(group = "org.apache.avro", module = "avro") + exclude(group = "org.apache.hadoop", module = "hadoop-yarn-common") + exclude(group = "com.google.inject.extensions", module = "guice-servlet") + exclude(group = "io.netty", module = "netty") + } + + testImplementation(apache.kafka.connect.api) + testImplementation(apache.kafka.connect.runtime) + testImplementation(apache.kafka.connect.json) + testImplementation(testinglibs.junit.jupiter) + testImplementation(apache.parquet.tools) { exclude(group = "org.slf4j", module = "slf4j-api") } + testImplementation(jackson.databind) + testImplementation(testinglibs.mockito.core) + testImplementation(testinglibs.assertj.core) + + testImplementation(testinglibs.woodstox.stax2.api) + testImplementation(apache.hadoop.mapreduce.client.core) + testImplementation(confluent.kafka.connect.avro.converter) + + testRuntimeOnly(testinglibs.junit.jupiter.engine) + testRuntimeOnly(logginglibs.logback.classic) +} + +tasks.withType { archiveBaseName.set(project.name + "-for-apache-kafka-connect") } + +distributions { main { distributionBaseName.set(project.name + "-for-apache-kafka-connect") } } + +publishing { + publications { + create("publishMavenJavaArtifact") { + groupId = group.toString() + artifactId = "s3-connectors-common-for-apache-kafka-connect" + version = version.toString() + + from(components["java"]) + + pom { + name = "Aiven's Common Module for Apache Kafka connectors" + description = "Aiven's Common Module for Apache Kafka connectors" + url = "https://github.com/Aiven-Open/cloud-storage-connectors-for-apache-kafka" + organization { + name = "Aiven Oy" + url = "https://aiven.io" + } + + licenses { + license { + name = "Apache 2.0" + url = "http://www.apache.org/licenses/LICENSE-2.0" + distribution = "repo" + } + } + + developers { + developer { + id = "aiven" + name = "Aiven Opensource" + email = "opensource@aiven.io" + } + } + + scm { + connection = + "scm:git:git://github.com:Aiven-Open/cloud-storage-connectors-for-apache-kafka.git" + developerConnection = + "scm:git:ssh://github.com:Aiven-Open/cloud-storage-connectors-for-apache-kafka.git" + url = "https://github.com/Aiven-Open/cloud-storage-connectors-for-apache-kafka" + } + } + } + } + + repositories { + maven { + name = "sonatype" + + val releasesRepoUrl = uri("https://oss.sonatype.org/service/local/staging/deploy/maven2") + val snapshotsRepoUrl = uri("https://oss.sonatype.org/content/repositories/snapshots") + url = if (version.toString().endsWith("SNAPSHOT")) snapshotsRepoUrl else releasesRepoUrl + + credentials(PasswordCredentials::class) + } + } +} + +signing { + sign(publishing.publications["publishMavenJavaArtifact"]) + useGpgCmd() + // Some issue in the plugin: + // GPG outputs already armored signatures. The plugin also does armoring for `asc` files. + // This results in double armored signatures, i.e. garbage. + // Override the signature type provider to use unarmored output for `asc` files, which works well + // with GPG. + class ASCSignatureProvider : AbstractSignatureTypeProvider() { + val binary = + object : BinarySignatureType() { + override fun getExtension(): String { + return "asc" + } + } + + init { + register(binary) + setDefaultType(binary.extension) + } + } + signatureTypes = ASCSignatureProvider() +} diff --git a/s3-connectors-common/src/main/java/io/aiven/kafka/connect/iam/AwsAccessSecret.java b/s3-connectors-common/src/main/java/io/aiven/kafka/connect/iam/AwsAccessSecret.java new file mode 100644 index 000000000..2f7c9d080 --- /dev/null +++ b/s3-connectors-common/src/main/java/io/aiven/kafka/connect/iam/AwsAccessSecret.java @@ -0,0 +1,43 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.iam; + +import java.util.Objects; + +import org.apache.kafka.common.config.types.Password; + +public final class AwsAccessSecret { + private final Password accessKeyId; + private final Password secretAccessKey; + + public AwsAccessSecret(final Password accessKeyId, final Password secretAccessKey) { + this.accessKeyId = accessKeyId; + this.secretAccessKey = secretAccessKey; + } + + public Password getAccessKeyId() { + return accessKeyId; + } + + public Password getSecretAccessKey() { + return secretAccessKey; + } + + public Boolean isValid() { + return Objects.nonNull(accessKeyId) && Objects.nonNull(secretAccessKey); + } +} diff --git a/s3-connectors-common/src/main/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactory.java b/s3-connectors-common/src/main/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactory.java new file mode 100644 index 000000000..a7007ddeb --- /dev/null +++ b/s3-connectors-common/src/main/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactory.java @@ -0,0 +1,64 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.iam; + +import io.aiven.kafka.connect.s3.S3BaseConfig; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.services.securitytoken.AWSSecurityTokenService; +import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder; + +public class AwsCredentialProviderFactory { + + public AWSCredentialsProvider getProvider(final S3BaseConfig config) { + if (config.hasAwsStsRole()) { + return getStsProvider(config); + } + final AwsAccessSecret awsCredentials = config.getAwsCredentials(); + if (!awsCredentials.isValid()) { + return config.getCustomCredentialsProvider(); + } + return new AWSStaticCredentialsProvider(new BasicAWSCredentials(awsCredentials.getAccessKeyId().value(), + awsCredentials.getSecretAccessKey().value())); + } + + private AWSCredentialsProvider getStsProvider(final S3BaseConfig config) { + final AwsStsRole awsstsRole = config.getStsRole(); + final AWSSecurityTokenService sts = securityTokenService(config); + return new STSAssumeRoleSessionCredentialsProvider.Builder(awsstsRole.getArn(), awsstsRole.getSessionName()) + .withStsClient(sts) + .withExternalId(awsstsRole.getExternalId()) + .withRoleSessionDurationSeconds(awsstsRole.getSessionDurationSeconds()) + .build(); + } + + private AWSSecurityTokenService securityTokenService(final S3BaseConfig config) { + if (config.hasStsEndpointConfig()) { + final AwsStsEndpointConfig endpointConfig = config.getStsEndpointConfig(); + final AwsClientBuilder.EndpointConfiguration stsConfig = new AwsClientBuilder.EndpointConfiguration( + endpointConfig.getServiceEndpoint(), endpointConfig.getSigningRegion()); + final AWSSecurityTokenServiceClientBuilder stsBuilder = AWSSecurityTokenServiceClientBuilder.standard(); + stsBuilder.setEndpointConfiguration(stsConfig); + return stsBuilder.build(); + } + return AWSSecurityTokenServiceClientBuilder.defaultClient(); + } +} diff --git a/s3-connectors-common/src/main/java/io/aiven/kafka/connect/iam/AwsStsEndpointConfig.java b/s3-connectors-common/src/main/java/io/aiven/kafka/connect/iam/AwsStsEndpointConfig.java new file mode 100644 index 000000000..49310be7f --- /dev/null +++ b/s3-connectors-common/src/main/java/io/aiven/kafka/connect/iam/AwsStsEndpointConfig.java @@ -0,0 +1,43 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.iam; + +import java.util.Objects; + +public final class AwsStsEndpointConfig { + public static final String AWS_STS_GLOBAL_ENDPOINT = "https://sts.amazonaws.com"; + + private final String serviceEndpoint; + private final String signingRegion; + + public AwsStsEndpointConfig(final String serviceEndpoint, final String signingRegion) { + this.serviceEndpoint = serviceEndpoint; + this.signingRegion = signingRegion; + } + + public String getServiceEndpoint() { + return serviceEndpoint; + } + + public String getSigningRegion() { + return signingRegion; + } + + public Boolean isValid() { + return Objects.nonNull(signingRegion); + } +} diff --git a/s3-connectors-common/src/main/java/io/aiven/kafka/connect/iam/AwsStsRole.java b/s3-connectors-common/src/main/java/io/aiven/kafka/connect/iam/AwsStsRole.java new file mode 100644 index 000000000..f62c2e540 --- /dev/null +++ b/s3-connectors-common/src/main/java/io/aiven/kafka/connect/iam/AwsStsRole.java @@ -0,0 +1,62 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.iam; + +import java.util.Objects; + +import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; + +public final class AwsStsRole { + + // AssumeRole request limit details here: + // https://docs.aws.amazon.com/STS/latest/APIReference/API_AssumeRole.html + public static final int MIN_SESSION_DURATION = STSAssumeRoleSessionCredentialsProvider.DEFAULT_DURATION_SECONDS; + public static final int MAX_SESSION_DURATION = 43_200; + + private final String arn; + private final String externalId; + private final String sessionName; + private final int sessionDurationSeconds; + + public AwsStsRole(final String arn, final String externalId, final String sessionName, + final int sessionDurationSeconds) { + this.arn = arn; + this.externalId = externalId; + this.sessionName = sessionName; + this.sessionDurationSeconds = sessionDurationSeconds; + } + + public String getArn() { + return arn; + } + + public String getExternalId() { + return externalId; + } + + public String getSessionName() { + return sessionName; + } + + public int getSessionDurationSeconds() { + return sessionDurationSeconds; + } + + public Boolean isValid() { + return Objects.nonNull(arn) && Objects.nonNull(sessionName); + } +} diff --git a/s3-connectors-common/src/main/java/io/aiven/kafka/connect/s3/S3BaseConfig.java b/s3-connectors-common/src/main/java/io/aiven/kafka/connect/s3/S3BaseConfig.java new file mode 100644 index 000000000..d58141e97 --- /dev/null +++ b/s3-connectors-common/src/main/java/io/aiven/kafka/connect/s3/S3BaseConfig.java @@ -0,0 +1,463 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.s3; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigException; + +import io.aiven.kafka.connect.common.config.validators.FileCompressionTypeValidator; +import io.aiven.kafka.connect.common.config.validators.NonEmptyPassword; +import io.aiven.kafka.connect.common.config.validators.OutputFieldsValidator; +import io.aiven.kafka.connect.common.config.validators.UrlValidator; +import io.aiven.kafka.connect.iam.AwsAccessSecret; +import io.aiven.kafka.connect.iam.AwsStsEndpointConfig; +import io.aiven.kafka.connect.iam.AwsStsRole; + +import com.amazonaws.auth.AWSCredentialsProvider; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.RegionUtils; +import com.amazonaws.regions.Regions; +import com.amazonaws.services.s3.internal.BucketNameUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class S3BaseConfig extends AbstractConfig { + public static final Logger LOGGER = LoggerFactory.getLogger(S3BaseConfig.class); + + @Deprecated + public static final String OUTPUT_COMPRESSION = "output_compression"; + @Deprecated + public static final String OUTPUT_COMPRESSION_TYPE_GZIP = "gzip"; + @Deprecated + public static final String OUTPUT_COMPRESSION_TYPE_NONE = "none"; + + @Deprecated + public static final String OUTPUT_FIELDS = "output_fields"; + @Deprecated + public static final String TIMESTAMP_TIMEZONE = "timestamp.timezone"; + @Deprecated + public static final String TIMESTAMP_SOURCE = "timestamp.source"; + @Deprecated + public static final String OUTPUT_FIELD_NAME_KEY = "key"; + @Deprecated + public static final String OUTPUT_FIELD_NAME_OFFSET = "offset"; + @Deprecated + public static final String OUTPUT_FIELD_NAME_TIMESTAMP = "timestamp"; + @Deprecated + public static final String OUTPUT_FIELD_NAME_VALUE = "value"; + @Deprecated + public static final String OUTPUT_FIELD_NAME_HEADERS = "headers"; + + @Deprecated + public static final String AWS_ACCESS_KEY_ID = "aws_access_key_id"; + @Deprecated + public static final String AWS_SECRET_ACCESS_KEY = "aws_secret_access_key"; + @Deprecated + public static final String AWS_S3_BUCKET = "aws_s3_bucket"; + @Deprecated + public static final String AWS_S3_ENDPOINT = "aws_s3_endpoint"; + @Deprecated + public static final String AWS_S3_REGION = "aws_s3_region"; + @Deprecated + public static final String AWS_S3_PREFIX = "aws_s3_prefix"; + + public static final String AWS_ACCESS_KEY_ID_CONFIG = "aws.access.key.id"; + public static final String AWS_SECRET_ACCESS_KEY_CONFIG = "aws.secret.access.key"; + public static final String AWS_CREDENTIALS_PROVIDER_CONFIG = "aws.credentials.provider"; + public static final String AWS_CREDENTIAL_PROVIDER_DEFAULT = "com.amazonaws.auth.DefaultAWSCredentialsProviderChain"; + public static final String AWS_S3_BUCKET_NAME_CONFIG = "aws.s3.bucket.name"; + public static final String AWS_S3_SSE_ALGORITHM_CONFIG = "aws.s3.sse.algorithm"; + public static final String AWS_S3_ENDPOINT_CONFIG = "aws.s3.endpoint"; + public static final String AWS_S3_REGION_CONFIG = "aws.s3.region"; + public static final String AWS_S3_PART_SIZE = "aws.s3.part.size.bytes"; + + public static final String AWS_S3_PREFIX_CONFIG = "aws.s3.prefix"; + public static final String AWS_STS_ROLE_ARN = "aws.sts.role.arn"; + public static final String AWS_STS_ROLE_EXTERNAL_ID = "aws.sts.role.external.id"; + public static final String AWS_STS_ROLE_SESSION_NAME = "aws.sts.role.session.name"; + public static final String AWS_STS_ROLE_SESSION_DURATION = "aws.sts.role.session.duration"; + public static final String AWS_STS_CONFIG_ENDPOINT = "aws.sts.config.endpoint"; + + public static final String AWS_S3_RETRY_BACKOFF_DELAY_MS_CONFIG = "aws.s3.backoff.delay.ms"; + public static final String AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG = "aws.s3.backoff.max.delay.ms"; + public static final String AWS_S3_RETRY_BACKOFF_MAX_RETRIES_CONFIG = "aws.s3.backoff.max.retries"; + + private static final String GROUP_AWS = "AWS"; + private static final String GROUP_AWS_STS = "AWS STS"; + + private static final String GROUP_S3_RETRY_BACKOFF_POLICY = "S3 retry backoff policy"; + + public static final int DEFAULT_PART_SIZE = 5 * 1024 * 1024; + + // Default values from AWS SDK, since they are hidden + public static final int AWS_S3_RETRY_BACKOFF_DELAY_MS_DEFAULT = 100; + public static final int AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_DEFAULT = 20_000; + // Comment in AWS SDK for max retries: + // Maximum retry limit. Avoids integer overflow issues. + // + // NOTE: If the value is greater than 30, there can be integer overflow + // issues during delay calculation. + // in other words we can't use values greater than 30 + public static final int S3_RETRY_BACKOFF_MAX_RETRIES_DEFAULT = 3; + + protected S3BaseConfig(ConfigDef definition, Map originals) { // NOPMD UnusedAssignment + super(definition, originals); + } + + protected static Map handleDeprecatedYyyyUppercase(final Map properties) { + if (!properties.containsKey(AWS_S3_PREFIX_CONFIG)) { + return properties; + } + + final var result = new HashMap<>(properties); + for (final var prop : List.of(AWS_S3_PREFIX_CONFIG)) { + if (properties.containsKey(prop)) { + String template = properties.get(prop); + final String originalTemplate = template; + + final var unitYyyyPattern = Pattern.compile("\\{\\{\\s*timestamp\\s*:\\s*unit\\s*=\\s*YYYY\\s*}}"); + template = unitYyyyPattern.matcher(template) + .replaceAll(matchResult -> matchResult.group().replace("YYYY", "yyyy")); + + if (!template.equals(originalTemplate)) { + LOGGER.warn("{{timestamp:unit=YYYY}} is no longer supported, " + + "please use {{timestamp:unit=yyyy}} instead. " + "It was automatically replaced: {}", + template); + } + + result.put(prop, template); + } + } + return result; + } + + protected static void addS3RetryPolicies(final ConfigDef configDef) { + var retryPolicyGroupCounter = 0; + configDef.define(AWS_S3_RETRY_BACKOFF_DELAY_MS_CONFIG, ConfigDef.Type.LONG, + AWS_S3_RETRY_BACKOFF_DELAY_MS_DEFAULT, ConfigDef.Range.atLeast(1L), ConfigDef.Importance.MEDIUM, + "S3 default base sleep time for non-throttled exceptions in milliseconds. " + "Default is " + + AWS_S3_RETRY_BACKOFF_DELAY_MS_DEFAULT + ".", + GROUP_S3_RETRY_BACKOFF_POLICY, retryPolicyGroupCounter++, // NOPMD UnusedAssignment + ConfigDef.Width.NONE, AWS_S3_RETRY_BACKOFF_DELAY_MS_CONFIG); + configDef.define(AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG, ConfigDef.Type.LONG, + AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_DEFAULT, ConfigDef.Range.atLeast(1L), ConfigDef.Importance.MEDIUM, + "S3 maximum back-off time before retrying a request in milliseconds. " + "Default is " + + AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_DEFAULT + ".", + GROUP_S3_RETRY_BACKOFF_POLICY, retryPolicyGroupCounter++, // NOPMD UnusedAssignment + ConfigDef.Width.NONE, AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG); + configDef.define(AWS_S3_RETRY_BACKOFF_MAX_RETRIES_CONFIG, ConfigDef.Type.INT, + S3_RETRY_BACKOFF_MAX_RETRIES_DEFAULT, ConfigDef.Range.between(1L, 30), ConfigDef.Importance.MEDIUM, + "Maximum retry limit " + "(if the value is greater than 30, " + + "there can be integer overflow issues during delay calculation). " + "Default is " + + S3_RETRY_BACKOFF_MAX_RETRIES_DEFAULT + ".", + GROUP_S3_RETRY_BACKOFF_POLICY, retryPolicyGroupCounter++, // NOPMD UnusedAssignment + ConfigDef.Width.NONE, AWS_S3_RETRY_BACKOFF_MAX_RETRIES_CONFIG); + } + + protected static void addAwsConfigGroup(final ConfigDef configDef) { + int awsGroupCounter = 0; + + configDef.define(AWS_ACCESS_KEY_ID_CONFIG, Type.PASSWORD, null, new NonEmptyPassword(), Importance.MEDIUM, + "AWS Access Key ID", GROUP_AWS, awsGroupCounter++, ConfigDef.Width.NONE, AWS_ACCESS_KEY_ID_CONFIG); + + configDef.define(AWS_SECRET_ACCESS_KEY_CONFIG, Type.PASSWORD, null, new NonEmptyPassword(), Importance.MEDIUM, + "AWS Secret Access Key", GROUP_AWS, awsGroupCounter++, ConfigDef.Width.NONE, + AWS_SECRET_ACCESS_KEY_CONFIG); + + configDef.define(AWS_CREDENTIALS_PROVIDER_CONFIG, ConfigDef.Type.CLASS, AWS_CREDENTIAL_PROVIDER_DEFAULT, + ConfigDef.Importance.MEDIUM, + "When you initialize a new " + "service client without supplying any arguments, " + + "the AWS SDK for Java attempts to find temporary " + + "credentials by using the default credential " + "provider chain implemented by the " + + "DefaultAWSCredentialsProviderChain class.", + + GROUP_AWS, awsGroupCounter++, ConfigDef.Width.NONE, AWS_CREDENTIALS_PROVIDER_CONFIG); + + configDef.define(AWS_S3_BUCKET_NAME_CONFIG, Type.STRING, null, new BucketNameValidator(), Importance.MEDIUM, + "AWS S3 Bucket name", GROUP_AWS, awsGroupCounter++, ConfigDef.Width.NONE, AWS_S3_BUCKET_NAME_CONFIG); + + // AWS S3 Server Side Encryption Algorithm configuration + // Example values: 'AES256' for S3-managed keys, 'aws:kms' for AWS KMS-managed keys + configDef.define(AWS_S3_SSE_ALGORITHM_CONFIG, Type.STRING, null, new ConfigDef.NonEmptyString(), + Importance.MEDIUM, "AWS S3 Server Side Encryption Algorithm. Example values: 'AES256', 'aws:kms'.", + GROUP_AWS, awsGroupCounter++, ConfigDef.Width.NONE, AWS_S3_SSE_ALGORITHM_CONFIG); + + configDef.define(AWS_S3_ENDPOINT_CONFIG, Type.STRING, null, new UrlValidator(), Importance.LOW, + "Explicit AWS S3 Endpoint Address, mainly for testing", GROUP_AWS, awsGroupCounter++, + ConfigDef.Width.NONE, AWS_S3_ENDPOINT_CONFIG); + + configDef.define(AWS_S3_REGION_CONFIG, Type.STRING, null, new AwsRegionValidator(), Importance.MEDIUM, + "AWS S3 Region, e.g. us-east-1", GROUP_AWS, awsGroupCounter++, // NOPMD UnusedAssignment + ConfigDef.Width.NONE, AWS_S3_REGION_CONFIG); + } + + protected static void addS3SinkConfig(final ConfigDef configDef) { + int awsS3SinkCounter = 0; + + configDef.define(AWS_S3_PART_SIZE, Type.INT, DEFAULT_PART_SIZE, new ConfigDef.Validator() { + + static final int MAX_BUFFER_SIZE = 2_000_000_000; + + @Override + public void ensureValid(final String name, final Object value) { + if (value == null) { + throw new ConfigException(name, null, "Part size must be non-null"); + } + final var number = (Number) value; + if (number.longValue() <= 0) { + throw new ConfigException(name, value, "Part size must be greater than 0"); + } + if (number.longValue() > MAX_BUFFER_SIZE) { + throw new ConfigException(name, value, + "Part size must be no more: " + MAX_BUFFER_SIZE + " bytes (2GB)"); + } + } + }, Importance.MEDIUM, + "The Part Size in S3 Multi-part Uploads in bytes. Maximum is " + Integer.MAX_VALUE + + " (2GB) and default is " + DEFAULT_PART_SIZE + " (5MB)", + GROUP_AWS, awsS3SinkCounter++, // NOPMD UnusedAssignment + ConfigDef.Width.NONE, AWS_S3_PART_SIZE); + } + + protected static void addAwsStsConfigGroup(final ConfigDef configDef) { + int awsStsGroupCounter = 0; + configDef.define(AWS_STS_ROLE_ARN, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(), + ConfigDef.Importance.MEDIUM, "AWS STS Role", GROUP_AWS_STS, awsStsGroupCounter++, // NOPMD + // UnusedAssignment + ConfigDef.Width.NONE, AWS_STS_ROLE_ARN); + + configDef.define(AWS_STS_ROLE_SESSION_NAME, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(), + Importance.MEDIUM, "AWS STS Session name", GROUP_AWS_STS, awsStsGroupCounter++, // NOPMD + // UnusedAssignment + ConfigDef.Width.NONE, AWS_STS_ROLE_SESSION_NAME); + + configDef.define(AWS_STS_ROLE_SESSION_DURATION, ConfigDef.Type.INT, 3600, + ConfigDef.Range.between(AwsStsRole.MIN_SESSION_DURATION, AwsStsRole.MAX_SESSION_DURATION), + Importance.MEDIUM, "AWS STS Session duration", GROUP_AWS_STS, awsStsGroupCounter++, // NOPMD + // UnusedAssignment + ConfigDef.Width.NONE, AWS_STS_ROLE_SESSION_DURATION); + + configDef.define(AWS_STS_ROLE_EXTERNAL_ID, ConfigDef.Type.STRING, null, new ConfigDef.NonEmptyString(), + Importance.MEDIUM, "AWS STS External Id", GROUP_AWS_STS, awsStsGroupCounter++, // NOPMD UnusedAssignment + ConfigDef.Width.NONE, AWS_STS_ROLE_EXTERNAL_ID); + + configDef.define(AWS_STS_CONFIG_ENDPOINT, ConfigDef.Type.STRING, AwsStsEndpointConfig.AWS_STS_GLOBAL_ENDPOINT, + new ConfigDef.NonEmptyString(), Importance.MEDIUM, "AWS STS Config Endpoint", GROUP_AWS_STS, + awsStsGroupCounter++, // NOPMD UnusedAssignment + ConfigDef.Width.NONE, AWS_STS_CONFIG_ENDPOINT); + } + + protected static void addDeprecatedConfiguration(final ConfigDef configDef) { + configDef.define(AWS_S3_PREFIX_CONFIG, Type.STRING, null, new ConfigDef.NonEmptyString(), Importance.MEDIUM, + "[Deprecated] Use `file.name.template` instead. Prefix for stored objects, e.g. cluster-1/", GROUP_AWS, + 0, ConfigDef.Width.NONE, AWS_S3_PREFIX_CONFIG); + + configDef.define(AWS_ACCESS_KEY_ID, Type.PASSWORD, null, new NonEmptyPassword() { + @Override + public void ensureValid(final String name, final Object value) { + LOGGER.info(AWS_ACCESS_KEY_ID + " property is deprecated please read documentation for the new name"); + super.ensureValid(name, value); + } + }, Importance.MEDIUM, "AWS Access Key ID"); + + configDef.define(AWS_SECRET_ACCESS_KEY, Type.PASSWORD, null, new NonEmptyPassword() { + @Override + public void ensureValid(final String name, final Object value) { + LOGGER.info( + AWS_SECRET_ACCESS_KEY + " property is deprecated please read documentation for the new name"); + super.ensureValid(name, value); + } + }, Importance.MEDIUM, "AWS Secret Access Key"); + + configDef.define(AWS_S3_BUCKET, Type.STRING, null, new BucketNameValidator() { + @Override + public void ensureValid(final String name, final Object object) { + LOGGER.info(AWS_S3_BUCKET + " property is deprecated please read documentation for the new name"); + super.ensureValid(name, object); + } + }, Importance.MEDIUM, "AWS S3 Bucket name"); + + configDef.define(AWS_S3_ENDPOINT, Type.STRING, null, new UrlValidator() { + @Override + public void ensureValid(final String name, final Object object) { + LOGGER.info(AWS_S3_ENDPOINT + " property is deprecated please read documentation for the new name"); + super.ensureValid(name, object); + } + }, Importance.LOW, "Explicit AWS S3 Endpoint Address, mainly for testing"); + + configDef.define(AWS_S3_REGION, Type.STRING, null, new AwsRegionValidator() { + @Override + public void ensureValid(final String name, final Object object) { + LOGGER.info(AWS_S3_REGION + " property is deprecated please read documentation for the new name"); + super.ensureValid(name, object); + } + }, Importance.MEDIUM, "AWS S3 Region, e.g. us-east-1"); + + configDef.define(AWS_S3_PREFIX, Type.STRING, null, new ConfigDef.NonEmptyString() { + @Override + public void ensureValid(final String name, final Object object) { + LOGGER.info(AWS_S3_PREFIX + " property is deprecated please read documentation for the new name"); + super.ensureValid(name, object); + } + }, Importance.MEDIUM, "Prefix for stored objects, e.g. cluster-1/"); + + configDef.define(OUTPUT_FIELDS, Type.LIST, null, new OutputFieldsValidator() { + @Override + public void ensureValid(final String name, final Object value) { + LOGGER.info(OUTPUT_FIELDS + " property is deprecated please read documentation for the new name"); + super.ensureValid(name, value); + } + }, Importance.MEDIUM, + "Output fields. A comma separated list of one or more: " + OUTPUT_FIELD_NAME_KEY + ", " + + OUTPUT_FIELD_NAME_OFFSET + ", " + OUTPUT_FIELD_NAME_TIMESTAMP + ", " + OUTPUT_FIELD_NAME_VALUE + + ", " + OUTPUT_FIELD_NAME_HEADERS); + + configDef.define(OUTPUT_COMPRESSION, Type.STRING, null, new FileCompressionTypeValidator() { + @Override + public void ensureValid(final String name, final Object value) { + LOGGER.info(OUTPUT_COMPRESSION + " property is deprecated please read documentation for the new name"); + super.ensureValid(name, value); + } + }, Importance.MEDIUM, "Output compression. Valid values are: " + OUTPUT_COMPRESSION_TYPE_GZIP + " and " + + OUTPUT_COMPRESSION_TYPE_NONE); + } + + // Custom Validators + protected static class AwsRegionValidator implements ConfigDef.Validator { + private static final String SUPPORTED_AWS_REGIONS = Arrays.stream(Regions.values()) + .map(Regions::getName) + .collect(Collectors.joining(", ")); + + @Override + public void ensureValid(final String name, final Object value) { + if (Objects.nonNull(value)) { + final String valueStr = (String) value; + final Region region = RegionUtils.getRegion(valueStr); + if (!RegionUtils.getRegions().contains(region)) { + throw new ConfigException(name, valueStr, "supported values are: " + SUPPORTED_AWS_REGIONS); + } + } + } + } + + private static class BucketNameValidator implements ConfigDef.Validator { + @Override + public void ensureValid(final String name, final Object value) { + try { + if (value != null) { + BucketNameUtils.validateBucketName((String) value); + } + } catch (final IllegalArgumentException e) { + throw new ConfigException("Illegal bucket name: " + e.getMessage()); + } + } + } + + public AwsStsRole getStsRole() { + return new AwsStsRole(getString(AWS_STS_ROLE_ARN), getString(AWS_STS_ROLE_EXTERNAL_ID), + getString(AWS_STS_ROLE_SESSION_NAME), getInt(AWS_STS_ROLE_SESSION_DURATION)); + } + + public boolean hasAwsStsRole() { + return getStsRole().isValid(); + } + + public boolean hasStsEndpointConfig() { + return getStsEndpointConfig().isValid(); + } + + public AwsStsEndpointConfig getStsEndpointConfig() { + return new AwsStsEndpointConfig(getString(AWS_STS_CONFIG_ENDPOINT), getString(AWS_S3_REGION_CONFIG)); + } + + public AwsAccessSecret getNewAwsCredentials() { + return new AwsAccessSecret(getPassword(AWS_ACCESS_KEY_ID_CONFIG), getPassword(AWS_SECRET_ACCESS_KEY_CONFIG)); + } + + public AwsAccessSecret getAwsCredentials() { + return getNewAwsCredentials().isValid() ? getNewAwsCredentials() : getOldAwsCredentials(); + } + + public String getAwsS3EndPoint() { + return Objects.nonNull(getString(AWS_S3_ENDPOINT_CONFIG)) + ? getString(AWS_S3_ENDPOINT_CONFIG) + : getString(AWS_S3_ENDPOINT); + } + + public Region getAwsS3Region() { + // we have priority of properties if old one not set or both old and new one set + // the new property value will be selected + if (Objects.nonNull(getString(AWS_S3_REGION_CONFIG))) { + return RegionUtils.getRegion(getString(AWS_S3_REGION_CONFIG)); + } else if (Objects.nonNull(getString(AWS_S3_REGION))) { + return RegionUtils.getRegion(getString(AWS_S3_REGION)); + } else { + return RegionUtils.getRegion(Regions.US_EAST_1.getName()); + } + } + + public String getAwsS3BucketName() { + return Objects.nonNull(getString(AWS_S3_BUCKET_NAME_CONFIG)) + ? getString(AWS_S3_BUCKET_NAME_CONFIG) + : getString(AWS_S3_BUCKET); + } + + public String getServerSideEncryptionAlgorithmName() { + return getString(AWS_S3_SSE_ALGORITHM_CONFIG); + } + + public String getAwsS3Prefix() { + return Objects.nonNull(getString(AWS_S3_PREFIX_CONFIG)) + ? getString(AWS_S3_PREFIX_CONFIG) + : getString(AWS_S3_PREFIX); + } + + public int getAwsS3PartSize() { + return getInt(AWS_S3_PART_SIZE); + } + + public long getS3RetryBackoffDelayMs() { + return getLong(AWS_S3_RETRY_BACKOFF_DELAY_MS_CONFIG); + } + + public long getS3RetryBackoffMaxDelayMs() { + return getLong(AWS_S3_RETRY_BACKOFF_MAX_DELAY_MS_CONFIG); + } + + public int getS3RetryBackoffMaxRetries() { + return getInt(AWS_S3_RETRY_BACKOFF_MAX_RETRIES_CONFIG); + } + + public AwsAccessSecret getOldAwsCredentials() { + return new AwsAccessSecret(getPassword(AWS_ACCESS_KEY_ID), getPassword(AWS_SECRET_ACCESS_KEY)); + } + + public AWSCredentialsProvider getCustomCredentialsProvider() { + return getConfiguredInstance(AWS_CREDENTIALS_PROVIDER_CONFIG, AWSCredentialsProvider.class); + } +} diff --git a/s3-connectors-common/src/main/java/io/aiven/kafka/connect/s3/S3ClientUtils.java b/s3-connectors-common/src/main/java/io/aiven/kafka/connect/s3/S3ClientUtils.java new file mode 100644 index 000000000..3664748a0 --- /dev/null +++ b/s3-connectors-common/src/main/java/io/aiven/kafka/connect/s3/S3ClientUtils.java @@ -0,0 +1,60 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.s3; + +import java.util.Objects; + +import io.aiven.kafka.connect.iam.AwsCredentialProviderFactory; + +import com.amazonaws.PredefinedClientConfigurations; +import com.amazonaws.client.builder.AwsClientBuilder; +import com.amazonaws.retry.PredefinedBackoffStrategies; +import com.amazonaws.retry.PredefinedRetryPolicies; +import com.amazonaws.retry.RetryPolicy; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; + +public class S3ClientUtils { + + private final AwsCredentialProviderFactory credentialFactory = new AwsCredentialProviderFactory(); + + public AmazonS3 createAmazonS3Client(final S3BaseConfig config) { + final var awsEndpointConfig = newEndpointConfiguration(config); + final var clientConfig = PredefinedClientConfigurations.defaultConfig() + .withRetryPolicy(new RetryPolicy(PredefinedRetryPolicies.DEFAULT_RETRY_CONDITION, + new PredefinedBackoffStrategies.FullJitterBackoffStrategy( + Math.toIntExact(config.getS3RetryBackoffDelayMs()), + Math.toIntExact(config.getS3RetryBackoffMaxDelayMs())), + config.getS3RetryBackoffMaxRetries(), false)); + final var s3ClientBuilder = AmazonS3ClientBuilder.standard() + .withCredentials(credentialFactory.getProvider(config)) + .withClientConfiguration(clientConfig); + if (Objects.isNull(awsEndpointConfig)) { + s3ClientBuilder.withRegion(config.getAwsS3Region().getName()); + } else { + s3ClientBuilder.withEndpointConfiguration(awsEndpointConfig).withPathStyleAccessEnabled(true); + } + return s3ClientBuilder.build(); + } + + private AwsClientBuilder.EndpointConfiguration newEndpointConfiguration(final S3BaseConfig config) { + if (Objects.isNull(config.getAwsS3EndPoint())) { + return null; + } + return new AwsClientBuilder.EndpointConfiguration(config.getAwsS3EndPoint(), config.getAwsS3Region().getName()); + } +} diff --git a/s3-connectors-common/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactoryTest.java b/s3-connectors-common/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactoryTest.java new file mode 100644 index 000000000..3ce925668 --- /dev/null +++ b/s3-connectors-common/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactoryTest.java @@ -0,0 +1,81 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.iam; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.HashMap; +import java.util.Map; + +import io.aiven.kafka.connect.s3.S3BaseConfig; +import io.aiven.kafka.connect.tools.AwsCredentialConfig; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; +import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider; +import com.amazonaws.regions.Regions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +final class AwsCredentialProviderFactoryTest { + private AwsCredentialProviderFactory factory; + private Map props; + + @BeforeEach + public void setUp() { + factory = new AwsCredentialProviderFactory(); + props = new HashMap<>(); + props.put(S3BaseConfig.AWS_S3_BUCKET_NAME_CONFIG, "anyBucket"); + props.put(S3BaseConfig.AWS_S3_BUCKET_NAME_CONFIG, "any-bucket"); + } + + @Test + void createsStsCredentialProviderIfSpecified() { + + props.put(S3BaseConfig.AWS_ACCESS_KEY_ID_CONFIG, "blah-blah-blah"); + props.put(S3BaseConfig.AWS_SECRET_ACCESS_KEY_CONFIG, "blah-blah-blah"); + props.put(S3BaseConfig.AWS_STS_ROLE_ARN, "arn:aws:iam::12345678910:role/S3SinkTask"); + props.put(S3BaseConfig.AWS_STS_ROLE_SESSION_NAME, "SESSION_NAME"); + props.put(S3BaseConfig.AWS_S3_REGION_CONFIG, Regions.US_EAST_1.getName()); + props.put(S3BaseConfig.AWS_STS_CONFIG_ENDPOINT, "https://sts.us-east-1.amazonaws.com"); + + final var config = new AwsCredentialConfig(props); + + final var credentialProvider = factory.getProvider(config); + assertThat(credentialProvider).isInstanceOf(STSAssumeRoleSessionCredentialsProvider.class); + } + + @Test + void createStaticCredentialProviderByDefault() { + props.put(S3BaseConfig.AWS_ACCESS_KEY_ID_CONFIG, "blah-blah-blah"); + props.put(S3BaseConfig.AWS_SECRET_ACCESS_KEY_CONFIG, "blah-blah-blah"); + + final var config = new AwsCredentialConfig(props); + + final var credentialProvider = factory.getProvider(config); + assertThat(credentialProvider).isInstanceOf(AWSStaticCredentialsProvider.class); + } + + @Test + void createDefaultCredentialsWhenNoCredentialsSpecified() { + final var config = new AwsCredentialConfig(props); + + final var credentialProvider = factory.getProvider(config); + assertThat(credentialProvider).isInstanceOf(DefaultAWSCredentialsProviderChain.class); + } + +} diff --git a/s3-connectors-common/src/test/java/io/aiven/kafka/connect/tools/AwsCredentialConfig.java b/s3-connectors-common/src/test/java/io/aiven/kafka/connect/tools/AwsCredentialConfig.java new file mode 100644 index 000000000..95f7d84e7 --- /dev/null +++ b/s3-connectors-common/src/test/java/io/aiven/kafka/connect/tools/AwsCredentialConfig.java @@ -0,0 +1,38 @@ +/* + * Copyright 2024 Aiven Oy + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.tools; + +import java.util.Map; + +import org.apache.kafka.common.config.ConfigDef; + +import io.aiven.kafka.connect.s3.S3BaseConfig; + +public class AwsCredentialConfig extends S3BaseConfig { + public AwsCredentialConfig(final Map properties) { + super(configDef(new ConfigDef()), handleDeprecatedYyyyUppercase(properties)); + } + + public static ConfigDef configDef(ConfigDef configDef) { // NOPMD UnusedAssignment + addS3RetryPolicies(configDef); + addAwsConfigGroup(configDef); + addAwsStsConfigGroup(configDef); + addDeprecatedConfiguration(configDef); + addS3SinkConfig(configDef); + return configDef; + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index bcb4435e3..cee8484cd 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -91,6 +91,8 @@ dependencyResolutionManagement { include("commons") +include("s3-connectors-common") + include("gcs-sink-connector") include("s3-sink-connector")