Skip to content

Commit

Permalink
Update xlang kinesis to v2 (#33416)
Browse files Browse the repository at this point in the history
* [WIP] Update xlang kinesis to v2

* cleanup

* Add missed file

* Fix up

* Fix up

* Fix up

* fix

* fmt

* Fix test

* lint

* Add serializer

* Add serializer

* Allow configuration to be serialized

* Allow configuration to be serialized

* Allow configuration to be serialized

* Allow configuration to be serialized

* debug info

* debug info

* debug info

* debug info

* debug info

* debug info

* Allow writebuilder to be serialized

* Try skipping certs

* Make sure it gets set for now

* put behind flag

* Doc + debug further

* Merge in master

* Debug info

* Pass through param

* Remove debug

* Remove debug

* override trust manager

* checkstyle

* Try disabling aggregation

* easier debugging

* Try upgrading localstack

* change how containers are started

* change how containers are started

* force http1

* Add back all tests

* Update changes wording

* Better change description
  • Loading branch information
damccorm authored Dec 30, 2024
1 parent df0ee41 commit 9b09eda
Show file tree
Hide file tree
Showing 10 changed files with 451 additions and 31 deletions.
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Python.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 7
"modification": 8
}

2 changes: 1 addition & 1 deletion CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@

## Breaking Changes

* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
* AWS V1 I/Os have been removed (Java). As part of this, x-lang Python Kinesis I/O has been updated to consume the V2 IO and it also no longer supports setting producer_properties ([#33430](https://github.com/apache/beam/issues/33430)).

## Deprecations

Expand Down
39 changes: 39 additions & 0 deletions sdks/java/io/amazon-web-services2/expansion-service/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

apply plugin: 'org.apache.beam.module'
apply plugin: 'application'
mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService"

applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.io.amazon-web-services2.expansion.service',
exportJavadoc: false,
validateShadowJar: false,
shadowClosure: {},
)

description = "Apache Beam :: SDKs :: Java :: IO :: Amazon Web Services 2 :: Expansion Service"
ext.summary = "Expansion service serving AWS2"

dependencies {
implementation project(":sdks:java:expansion-service")
permitUnusedDeclared project(":sdks:java:expansion-service")
implementation project(":sdks:java:io:amazon-web-services2")
permitUnusedDeclared project(":sdks:java:io:amazon-web-services2")
runtimeOnly library.java.slf4j_jdk14
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@

import java.io.Serializable;
import java.net.URI;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.time.Duration;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.apache.beam.sdk.io.aws2.options.AwsOptions;
import org.apache.beam.sdk.util.InstanceBuilder;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
Expand All @@ -37,9 +41,12 @@
import software.amazon.awssdk.core.client.builder.SdkSyncClientBuilder;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.retry.RetryPolicy;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.TlsTrustManagersProvider;
import software.amazon.awssdk.http.apache.ApacheHttpClient;
import software.amazon.awssdk.http.apache.ProxyConfiguration;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.internal.http.NoneTlsKeyManagersProvider;
import software.amazon.awssdk.regions.Region;

/**
Expand Down Expand Up @@ -113,6 +120,32 @@ static <BuilderT extends AwsClientBuilder<BuilderT, ClientT>, ClientT> ClientT b
return ClientBuilderFactory.getFactory(options).create(builder, config, options).build();
}

/** Trust provider to skip certificate verification. Should only be used for test pipelines. */
class SkipCertificateVerificationTrustManagerProvider implements TlsTrustManagersProvider {
public SkipCertificateVerificationTrustManagerProvider() {}

@Override
public TrustManager[] trustManagers() {
TrustManager tm =
new X509TrustManager() {
@Override
public void checkClientTrusted(X509Certificate[] x509CertificateArr, String str)
throws CertificateException {}

@Override
public void checkServerTrusted(X509Certificate[] x509CertificateArr, String str)
throws CertificateException {}

@Override
public X509Certificate[] getAcceptedIssuers() {
return new X509Certificate[0];
}
};
TrustManager[] tms = {tm};
return tms;
}
}

/**
* Default implementation of {@link ClientBuilderFactory}. This implementation can configure both,
* synchronous clients using {@link ApacheHttpClient} as well as asynchronous clients using {@link
Expand Down Expand Up @@ -161,7 +194,11 @@ public <BuilderT extends AwsClientBuilder<BuilderT, ClientT>, ClientT> BuilderT

HttpClientConfiguration httpConfig = options.getHttpClientConfiguration();
ProxyConfiguration proxyConfig = options.getProxyConfiguration();
if (proxyConfig != null || httpConfig != null) {
boolean skipCertificateVerification = false;
if (config.skipCertificateVerification() != null) {
skipCertificateVerification = config.skipCertificateVerification();
}
if (proxyConfig != null || httpConfig != null || skipCertificateVerification) {
if (builder instanceof SdkSyncClientBuilder) {
ApacheHttpClient.Builder client = syncClientBuilder();

Expand All @@ -177,6 +214,11 @@ public <BuilderT extends AwsClientBuilder<BuilderT, ClientT>, ClientT> BuilderT
setOptional(httpConfig.maxConnections(), client::maxConnections);
}

if (skipCertificateVerification) {
client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance());
client.tlsTrustManagersProvider(new SkipCertificateVerificationTrustManagerProvider());
}

// must use builder to make sure client is managed by the SDK
((SdkSyncClientBuilder<?, ?>) builder).httpClientBuilder(client);
} else if (builder instanceof SdkAsyncClientBuilder) {
Expand All @@ -201,6 +243,12 @@ public <BuilderT extends AwsClientBuilder<BuilderT, ClientT>, ClientT> BuilderT
setOptional(httpConfig.maxConnections(), client::maxConcurrency);
}

if (skipCertificateVerification) {
client.tlsKeyManagersProvider(NoneTlsKeyManagersProvider.getInstance());
client.tlsTrustManagersProvider(new SkipCertificateVerificationTrustManagerProvider());
client.protocol(Protocol.HTTP1_1);
}

// must use builder to make sure client is managed by the SDK
((SdkAsyncClientBuilder<?, ?>) builder).httpClientBuilder(client);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ public abstract class ClientConfiguration implements Serializable {
return regionId() != null ? Region.of(regionId()) : null;
}

/**
* Optional flag to skip certificate verification. Should only be overriden for test scenarios. If
* set, this overwrites the default in {@link AwsOptions#skipCertificateVerification()}.
*/
@JsonProperty
public abstract @Nullable @Pure Boolean skipCertificateVerification();

/**
* Optional service endpoint to use AWS compatible services instead, e.g. for testing. If set,
* this overwrites the default in {@link AwsOptions#getEndpoint()}.
Expand Down Expand Up @@ -156,6 +163,13 @@ public Builder retry(Consumer<RetryConfiguration.Builder> retry) {
return retry(builder.build());
}

/**
* Optional flag to skip certificate verification. Should only be overriden for test scenarios.
* If set, this overwrites the default in {@link AwsOptions#skipCertificateVerification()}.
*/
@JsonProperty
public abstract Builder skipCertificateVerification(boolean skipCertificateVerification);

abstract Builder regionId(String region);

abstract Builder credentialsProviderAsJson(String credentialsProvider);
Expand Down
Loading

0 comments on commit 9b09eda

Please sign in to comment.