Skip to content

Commit

Permalink
Initial addition of common functionaity between S3 sink and source co…
Browse files Browse the repository at this point in the history
…nnectors

Signed-off-by: Aindriu Lavelle <[email protected]>
  • Loading branch information
aindriu-aiven committed Oct 9, 2024
1 parent 939796f commit d41fe78
Show file tree
Hide file tree
Showing 13 changed files with 1,063 additions and 0 deletions.
12 changes: 12 additions & 0 deletions s3-connectors-common/.gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#
# https://help.github.com/articles/dealing-with-line-endings/
#
# Linux start script should use lf
/gradlew text eol=lf

# These are Windows script files and should use crlf
*.bat text eol=crlf

# Binary files should be left untouched
*.jar binary

12 changes: 12 additions & 0 deletions s3-connectors-common/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
target
docs/_build
.idea
*.iml
*.DS_Store
.gradle
.gradletasknamecache
build/
rpm/
rpmbuild/
lib/
*.sh
11 changes: 11 additions & 0 deletions s3-connectors-common/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Aiven's Common Module for S3 Connectors for Apache Kafka®

Shared common functionality for S3 connectivity and configuration

# License

This project is licensed under the [Apache License, Version 2.0](../LICENSE).

# Trademarks

Apache Kafka, Apache Kafka Connect and Apache Maven are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.
172 changes: 172 additions & 0 deletions s3-connectors-common/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
plugins { id("aiven-apache-kafka-connectors-all.java-conventions") }

val amazonS3Version by extra("1.12.729")
val amazonSTSVersion by extra("1.12.729")
val s3mockVersion by extra("0.2.6")

dependencies {
implementation("com.amazonaws:aws-java-sdk-s3:$amazonS3Version")
implementation("com.amazonaws:aws-java-sdk-sts:$amazonSTSVersion")

implementation(project(":commons"))

compileOnly(apache.kafka.connect.api)
compileOnly(apache.kafka.connect.runtime)
compileOnly(apache.kafka.connect.json)

implementation(confluent.kafka.connect.avro.data) {
exclude(group = "org.apache.kafka", module = "kafka-clients")
}

implementation(tools.spotbugs.annotations)
implementation(compressionlibs.snappy)
implementation(compressionlibs.zstd.jni)

implementation(logginglibs.slf4j)

implementation(apache.commons.text)

implementation(apache.parquet.avro) {
exclude(group = "org.xerial.snappy", module = "snappy-java")
exclude(group = "org.slf4j", module = "slf4j-api")
exclude(group = "org.apache.avro", module = "avro")
}
implementation(apache.hadoop.common) {
exclude(group = "org.apache.hadoop.thirdparty", module = "hadoop-shaded-protobuf_3_7")
exclude(group = "com.google.guava", module = "guava")
exclude(group = "commons-cli", module = "commons-cli")
exclude(group = "org.apache.commons", module = "commons-math3")
exclude(group = "org.apache.httpcomponents", module = "httpclient")
exclude(group = "commons-codec", module = "commons-codec")
exclude(group = "commons-io", module = "commons-io")
exclude(group = "commons-net", module = "commons-net")
exclude(group = "org.eclipse.jetty")
exclude(group = "org.eclipse.jetty.websocket")
exclude(group = "javax.servlet")
exclude(group = "javax.servlet.jsp")
exclude(group = "javax.activation")
exclude(group = "com.sun.jersey")
exclude(group = "log4j")
exclude(group = "org.apache.commons", module = "commons-text")
exclude(group = "org.slf4j", module = "slf4j-api")
exclude(group = "org.apache.hadoop", module = "hadoop-auth")
exclude(group = "org.apache.hadoop", module = "hadoop-yarn-api")
exclude(group = "com.google.re2j")
exclude(group = "com.google.protobuf")
exclude(group = "com.google.code.gson")
exclude(group = "com.jcraft")
exclude(group = "org.apache.curator")
exclude(group = "org.apache.zookeeper")
exclude(group = "org.apache.htrace")
exclude(group = "com.google.code.findbugs")
exclude(group = "org.apache.kerby")
exclude(group = "com.fasterxml.jackson.core")
exclude(group = "com.fasterxml.woodstox", module = "woodstox-core:5.0.3")
exclude(group = "org.apache.avro", module = "avro")
exclude(group = "org.apache.hadoop", module = "hadoop-yarn-common")
exclude(group = "com.google.inject.extensions", module = "guice-servlet")
exclude(group = "io.netty", module = "netty")
}

testImplementation(apache.kafka.connect.api)
testImplementation(apache.kafka.connect.runtime)
testImplementation(apache.kafka.connect.json)
testImplementation(testinglibs.junit.jupiter)
testImplementation(apache.parquet.tools) { exclude(group = "org.slf4j", module = "slf4j-api") }
testImplementation(jackson.databind)
testImplementation(testinglibs.mockito.core)
testImplementation(testinglibs.assertj.core)

testImplementation(testinglibs.woodstox.stax2.api)
testImplementation(apache.hadoop.mapreduce.client.core)
testImplementation(confluent.kafka.connect.avro.converter)

testRuntimeOnly(testinglibs.junit.jupiter.engine)
testRuntimeOnly(logginglibs.logback.classic)
}

tasks.withType<Jar> { archiveBaseName.set(project.name + "-for-apache-kafka-connect") }

distributions { main { distributionBaseName.set(project.name + "-for-apache-kafka-connect") } }

publishing {
publications {
create<MavenPublication>("publishMavenJavaArtifact") {
groupId = group.toString()
artifactId = "s3-connectors-common-for-apache-kafka-connect"
version = version.toString()

from(components["java"])

pom {
name = "Aiven's Common Module for Apache Kafka connectors"
description = "Aiven's Common Module for Apache Kafka connectors"
url = "https://github.com/Aiven-Open/cloud-storage-connectors-for-apache-kafka"
organization {
name = "Aiven Oy"
url = "https://aiven.io"
}

licenses {
license {
name = "Apache 2.0"
url = "http://www.apache.org/licenses/LICENSE-2.0"
distribution = "repo"
}
}

developers {
developer {
id = "aiven"
name = "Aiven Opensource"
email = "[email protected]"
}
}

scm {
connection =
"scm:git:git://github.com:Aiven-Open/cloud-storage-connectors-for-apache-kafka.git"
developerConnection =
"scm:git:ssh://github.com:Aiven-Open/cloud-storage-connectors-for-apache-kafka.git"
url = "https://github.com/Aiven-Open/cloud-storage-connectors-for-apache-kafka"
}
}
}
}

repositories {
maven {
name = "sonatype"

val releasesRepoUrl = uri("https://oss.sonatype.org/service/local/staging/deploy/maven2")
val snapshotsRepoUrl = uri("https://oss.sonatype.org/content/repositories/snapshots")
url = if (version.toString().endsWith("SNAPSHOT")) snapshotsRepoUrl else releasesRepoUrl

credentials(PasswordCredentials::class)
}
}
}

signing {
sign(publishing.publications["publishMavenJavaArtifact"])
useGpgCmd()
// Some issue in the plugin:
// GPG outputs already armored signatures. The plugin also does armoring for `asc` files.
// This results in double armored signatures, i.e. garbage.
// Override the signature type provider to use unarmored output for `asc` files, which works well
// with GPG.
class ASCSignatureProvider : AbstractSignatureTypeProvider() {
val binary =
object : BinarySignatureType() {
override fun getExtension(): String {
return "asc"
}
}

init {
register(binary)
setDefaultType(binary.extension)
}
}
signatureTypes = ASCSignatureProvider()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.iam;

import java.util.Objects;

import org.apache.kafka.common.config.types.Password;

public final class AwsAccessSecret {
private final Password accessKeyId;
private final Password secretAccessKey;

public AwsAccessSecret(final Password accessKeyId, final Password secretAccessKey) {
this.accessKeyId = accessKeyId;
this.secretAccessKey = secretAccessKey;
}

public Password getAccessKeyId() {
return accessKeyId;
}

public Password getSecretAccessKey() {
return secretAccessKey;
}

public Boolean isValid() {
return Objects.nonNull(accessKeyId) && Objects.nonNull(secretAccessKey);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.iam;

import io.aiven.kafka.connect.s3.S3BaseConfig;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;

public class AwsCredentialProviderFactory {

public AWSCredentialsProvider getProvider(final S3BaseConfig config) {
if (config.hasAwsStsRole()) {
return getStsProvider(config);
}
final AwsAccessSecret awsCredentials = config.getAwsCredentials();
if (!awsCredentials.isValid()) {
return config.getCustomCredentialsProvider();
}
return new AWSStaticCredentialsProvider(new BasicAWSCredentials(awsCredentials.getAccessKeyId().value(),
awsCredentials.getSecretAccessKey().value()));
}

private AWSCredentialsProvider getStsProvider(final S3BaseConfig config) {
final AwsStsRole awsstsRole = config.getStsRole();
final AWSSecurityTokenService sts = securityTokenService(config);
return new STSAssumeRoleSessionCredentialsProvider.Builder(awsstsRole.getArn(), awsstsRole.getSessionName())
.withStsClient(sts)
.withExternalId(awsstsRole.getExternalId())
.withRoleSessionDurationSeconds(awsstsRole.getSessionDurationSeconds())
.build();
}

private AWSSecurityTokenService securityTokenService(final S3BaseConfig config) {
if (config.hasStsEndpointConfig()) {
final AwsStsEndpointConfig endpointConfig = config.getStsEndpointConfig();
final AwsClientBuilder.EndpointConfiguration stsConfig = new AwsClientBuilder.EndpointConfiguration(
endpointConfig.getServiceEndpoint(), endpointConfig.getSigningRegion());
final AWSSecurityTokenServiceClientBuilder stsBuilder = AWSSecurityTokenServiceClientBuilder.standard();
stsBuilder.setEndpointConfiguration(stsConfig);
return stsBuilder.build();
}
return AWSSecurityTokenServiceClientBuilder.defaultClient();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2024 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.iam;

import java.util.Objects;

public final class AwsStsEndpointConfig {
public static final String AWS_STS_GLOBAL_ENDPOINT = "https://sts.amazonaws.com";

private final String serviceEndpoint;
private final String signingRegion;

public AwsStsEndpointConfig(final String serviceEndpoint, final String signingRegion) {
this.serviceEndpoint = serviceEndpoint;
this.signingRegion = signingRegion;
}

public String getServiceEndpoint() {
return serviceEndpoint;
}

public String getSigningRegion() {
return signingRegion;
}

public Boolean isValid() {
return Objects.nonNull(signingRegion);
}
}
Loading

0 comments on commit d41fe78

Please sign in to comment.