Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-601] Run KinesisIOIT with localstack #12422

Merged
merged 3 commits into from
Aug 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ task javaPostCommit() {
dependsOn ":sdks:java:extensions:google-cloud-platform-core:postCommit"
dependsOn ":sdks:java:extensions:zetasketch:postCommit"
dependsOn ":sdks:java:io:google-cloud-platform:postCommit"
dependsOn ":sdks:java:io:kinesis:integrationTest"
}

task sqlPostCommit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ class BeamModulePlugin implements Plugin<Project> {
def quickcheck_version = "0.8"
def spark_version = "2.4.6"
def spotbugs_version = "4.0.6"
def testcontainers_localstack_version = "1.14.3"

// A map of maps containing common libraries used per language. To use:
// dependencies {
Expand Down Expand Up @@ -564,6 +565,7 @@ class BeamModulePlugin implements Plugin<Project> {
spark_sql : "org.apache.spark:spark-sql_2.11:$spark_version",
spark_streaming : "org.apache.spark:spark-streaming_2.11:$spark_version",
stax2_api : "org.codehaus.woodstox:stax2-api:3.1.4",
testcontainers_localstack : "org.testcontainers:localstack:$testcontainers_localstack_version",
vendored_bytebuddy_1_10_8 : "org.apache.beam:beam-vendor-bytebuddy-1_10_8:0.1",
vendored_grpc_1_26_0 : "org.apache.beam:beam-vendor-grpc-1_26_0:0.3",
vendored_guava_26_0_jre : "org.apache.beam:beam-vendor-guava-26_0-jre:0.1",
Expand Down
2 changes: 1 addition & 1 deletion sdks/java/io/amazon-web-services/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ dependencies {
testCompile library.java.hamcrest_library
testCompile library.java.mockito_core
testCompile library.java.junit
testCompile library.java.testcontainers_localstack
testCompile "org.assertj:assertj-core:3.11.1"
testCompile 'org.elasticmq:elasticmq-rest-sqs_2.12:0.14.1'
testCompile 'org.testcontainers:localstack:1.11.2'
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
}
Expand Down
2 changes: 1 addition & 1 deletion sdks/java/io/amazon-web-services2/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ dependencies {
testCompile library.java.hamcrest_library
testCompile library.java.powermock
testCompile library.java.powermock_mockito
testCompile library.java.testcontainers_localstack
testCompile "org.assertj:assertj-core:3.11.1"
testCompile 'org.testcontainers:testcontainers:1.11.3'
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
}
Expand Down
1 change: 1 addition & 0 deletions sdks/java/io/kinesis/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ dependencies {
testCompile library.java.hamcrest_library
testCompile library.java.powermock
testCompile library.java.powermock_mockito
testCompile library.java.testcontainers_localstack
testCompile "org.assertj:assertj-core:3.11.1"
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.amazonaws.services.kinesis.producer.IKinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import java.net.URI;
import org.checkerframework.checker.nullness.qual.Nullable;

/** Basic implementation of {@link AWSClientsProvider} used by default in {@link KinesisIO}. */
Expand All @@ -39,16 +40,27 @@ class BasicKinesisProvider implements AWSClientsProvider {
private final String secretKey;
private final Regions region;
private final @Nullable String serviceEndpoint;
private final boolean verifyCertificate;

BasicKinesisProvider(
String accessKey, String secretKey, Regions region, @Nullable String serviceEndpoint) {
String accessKey,
String secretKey,
Regions region,
@Nullable String serviceEndpoint,
boolean verifyCertificate) {
checkArgument(accessKey != null, "accessKey can not be null");
checkArgument(secretKey != null, "secretKey can not be null");
checkArgument(region != null, "region can not be null");
this.accessKey = accessKey;
this.secretKey = secretKey;
this.region = region;
this.serviceEndpoint = serviceEndpoint;
this.verifyCertificate = verifyCertificate;
}

BasicKinesisProvider(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why you decided to modify BasicKinesisProvider and not just create a new provider class for testing that extends this basic one?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Service endpoint can be configured in the KinesisIO and I thought that omitting it in the producer configuration was a bug.
Certificate verification is needed for cross-language tests (PR 12297), we would need to use this TestKinesisProvider if verify_certificate was provided there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that service endpoint is used in com.amazonaws.services.kinesis.producer.KinesisProducer which is actually used for KinesisIO.Write. Am I mistaken?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you use KinesisIO.write()..withAWSClientsProvider("accesskey", "secretkey", "region", "serviceEndpointUrl") then without my modification beam will try to use real AWS url instead of the given "serviceEndpointUrl". I don't think this is the desired behaviour, but correct me if I'm wrong or I don't understand something (which is very probable, I haven't used AWS much in my life).

String accessKey, String secretKey, Regions region, @Nullable String serviceEndpoint) {
this(accessKey, secretKey, region, serviceEndpoint, true);
}

private AWSCredentialsProvider getCredentialsProvider() {
Expand Down Expand Up @@ -85,6 +97,12 @@ public AmazonCloudWatch getCloudWatchClient() {
public IKinesisProducer createKinesisProducer(KinesisProducerConfiguration config) {
config.setRegion(region.getName());
config.setCredentialsProvider(getCredentialsProvider());
if (serviceEndpoint != null) {
URI uri = URI.create(serviceEndpoint);
config.setKinesisEndpoint(uri.getHost());
config.setKinesisPort(uri.getPort());
}
config.setVerifyCertificate(verifyCertificate);
return new KinesisProducer(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,28 @@ public Read withAWSClientsProvider(
new BasicKinesisProvider(awsAccessKey, awsSecretKey, region, serviceEndpoint));
}

/**
* Specify credential details and region to be used to read from Kinesis. If you need more
* sophisticated credential protocol, then you should look at {@link
* Read#withAWSClientsProvider(AWSClientsProvider)}.
*
* <p>The {@code serviceEndpoint} sets an alternative service host. This is useful to execute
* the tests with Kinesis service emulator.
*
* <p>The {@code verifyCertificate} disables or enables certificate verification. Never set it
* to false in production.
*/
public Read withAWSClientsProvider(
String awsAccessKey,
String awsSecretKey,
Regions region,
String serviceEndpoint,
boolean verifyCertificate) {
return withAWSClientsProvider(
new BasicKinesisProvider(
awsAccessKey, awsSecretKey, region, serviceEndpoint, verifyCertificate));
}

/** Specifies to read at most a given number of records. */
public Read withMaxNumRecords(long maxNumRecords) {
checkArgument(
Expand Down Expand Up @@ -670,6 +692,28 @@ public Write withAWSClientsProvider(
new BasicKinesisProvider(awsAccessKey, awsSecretKey, region, serviceEndpoint));
}

/**
* Specify credential details and region to be used to write to Kinesis. If you need more
* sophisticated credential protocol, then you should look at {@link
* Write#withAWSClientsProvider(AWSClientsProvider)}.
*
* <p>The {@code serviceEndpoint} sets an alternative service host. This is useful to execute
* the tests with Kinesis service emulator.
*
* <p>The {@code verifyCertificate} disables or enables certificate verification. Never set it
* to false in production.
*/
public Write withAWSClientsProvider(
String awsAccessKey,
String awsSecretKey,
Regions region,
String serviceEndpoint,
boolean verifyCertificate) {
return withAWSClientsProvider(
new BasicKinesisProvider(
awsAccessKey, awsSecretKey, region, serviceEndpoint, verifyCertificate));
}

/**
* Specify the number of retries that will be used to flush the outstanding records in case if
* they were not flushed from the first time. Default number of retries is {@code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@
*/
package org.apache.beam.sdk.io.kinesis;

import com.amazonaws.SDKGlobalConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
Expand All @@ -35,33 +42,51 @@
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.testcontainers.containers.localstack.LocalStackContainer;

/**
* Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link
* KinesisTestOptions} in order to run this.
* KinesisTestOptions} in order to run this if you want to test it with production setup. By default
* when no options are provided an instance of localstack is used.
*/
@RunWith(JUnit4.class)
public class KinesisIOIT implements Serializable {
private static int numberOfShards;
private static int numberOfRows;
private static final String LOCALSTACK_VERSION = "0.11.3";
aromanenko-dev marked this conversation as resolved.
Show resolved Hide resolved

@Rule public TestPipeline pipelineWrite = TestPipeline.create();
@Rule public TestPipeline pipelineRead = TestPipeline.create();

private static KinesisTestOptions options;
private static final Instant now = Instant.now();
pjotrekk marked this conversation as resolved.
Show resolved Hide resolved

private static AmazonKinesis kinesisClient;
private static LocalStackContainer localstackContainer;
private static Instant now = Instant.now();

@BeforeClass
public static void setup() {
public static void setup() throws Exception {
PipelineOptionsFactory.register(KinesisTestOptions.class);
options = TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
numberOfShards = options.getNumberOfShards();
numberOfRows = options.getNumberOfRecords();
pjotrekk marked this conversation as resolved.
Show resolved Hide resolved
if (options.getUseLocalstack()) {
setupLocalstack();
kinesisClient = createKinesisClient();
createStream(options.getAwsKinesisStream());
}
}

@AfterClass
public static void teardown() {
if (options.getUseLocalstack()) {
kinesisClient.deleteStream(options.getAwsKinesisStream());
System.clearProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY);
System.clearProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY);
localstackContainer.stop();
}
}

/** Test which write and then read data for a Kinesis stream. */
Expand All @@ -74,7 +99,7 @@ public void testWriteThenRead() {
/** Write test dataset into Kinesis stream. */
private void runWrite() {
pipelineWrite
.apply("Generate Sequence", GenerateSequence.from(0).to((long) numberOfRows))
.apply("Generate Sequence", GenerateSequence.from(0).to(options.getNumberOfRecords()))
.apply("Prepare TestRows", ParDo.of(new TestRow.DeterministicallyConstructTestRowFn()))
.apply("Prepare Kinesis input records", ParDo.of(new ConvertToBytes()))
.apply(
Expand All @@ -85,7 +110,9 @@ private void runWrite() {
.withAWSClientsProvider(
options.getAwsAccessKey(),
options.getAwsSecretKey(),
Regions.fromName(options.getAwsKinesisRegion())));
Regions.fromName(options.getAwsKinesisRegion()),
options.getAwsServiceEndpoint(),
options.getAwsVerifyCertificate()));

pipelineWrite.run().waitUntilFinish();
}
Expand All @@ -99,28 +126,100 @@ private void runRead() {
.withAWSClientsProvider(
options.getAwsAccessKey(),
options.getAwsSecretKey(),
Regions.fromName(options.getAwsKinesisRegion()))
.withMaxNumRecords(numberOfRows)
Regions.fromName(options.getAwsKinesisRegion()),
options.getAwsServiceEndpoint(),
options.getAwsVerifyCertificate())
.withMaxNumRecords(options.getNumberOfRecords())
// to prevent endless running in case of error
.withMaxReadTime(Duration.standardMinutes(10))
.withMaxReadTime(Duration.standardMinutes(10L))
.withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
.withInitialTimestampInStream(now)
.withRequestRecordsLimit(1000));

PAssert.thatSingleton(output.apply("Count All", Count.globally()))
.isEqualTo((long) numberOfRows);
.isEqualTo((long) options.getNumberOfRecords());

PCollection<String> consolidatedHashcode =
output
.apply(ParDo.of(new ExtractDataValues()))
.apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());

PAssert.that(consolidatedHashcode)
.containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));
.containsInAnyOrder(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords()));

pipelineRead.run().waitUntilFinish();
}

/** Necessary setup for localstack environment. */
private static void setupLocalstack() {
// For some unclear reason localstack requires a timestamp in seconds
now = Instant.ofEpochMilli(Long.divideUnsigned(now.getMillis(), 1000L));

System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true");
System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");

localstackContainer =
new LocalStackContainer(LOCALSTACK_VERSION)
.withServices(LocalStackContainer.Service.KINESIS)
.withEnv("USE_SSL", "true")
.withStartupAttempts(3);
localstackContainer.start();

options.setAwsServiceEndpoint(
localstackContainer
.getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
.getServiceEndpoint()
.replace("http", "https"));
options.setAwsKinesisRegion(
localstackContainer
.getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
.getSigningRegion());
options.setAwsAccessKey(
localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSAccessKeyId());
options.setAwsSecretKey(
localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSSecretKey());
options.setNumberOfRecords(1000);
options.setNumberOfShards(1);
options.setAwsKinesisStream("beam_kinesis_test");
options.setAwsVerifyCertificate(false);
}

private static AmazonKinesis createKinesisClient() {
AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();

AWSCredentialsProvider credentialsProvider =
new AWSStaticCredentialsProvider(
new BasicAWSCredentials(options.getAwsAccessKey(), options.getAwsSecretKey()));
clientBuilder.setCredentials(credentialsProvider);

if (options.getAwsServiceEndpoint() != null) {
AwsClientBuilder.EndpointConfiguration endpointConfiguration =
new AwsClientBuilder.EndpointConfiguration(
options.getAwsServiceEndpoint(), options.getAwsKinesisRegion());
clientBuilder.setEndpointConfiguration(endpointConfiguration);
} else {
clientBuilder.setRegion(options.getAwsKinesisRegion());
}

return clientBuilder.build();
}

private static void createStream(String streamName) throws Exception {
kinesisClient.createStream(streamName, 1);
int repeats = 10;
for (int i = 0; i <= repeats; ++i) {
String streamStatus =
kinesisClient.describeStream(streamName).getStreamDescription().getStreamStatus();
if ("ACTIVE".equals(streamStatus)) {
break;
}
if (i == repeats) {
throw new RuntimeException("Unable to initialize stream");
}
Thread.sleep(1000L);
}
}

/** Produces test rows. */
private static class ConvertToBytes extends DoFn<TestRow, byte[]> {
@ProcessElement
Expand All @@ -141,7 +240,7 @@ private static final class RandomPartitioner implements KinesisPartitioner {
@Override
public String getPartitionKey(byte[] value) {
Random rand = new Random();
int n = rand.nextInt(numberOfShards) + 1;
int n = rand.nextInt(options.getNumberOfShards()) + 1;
return String.valueOf(n);
}

Expand Down
Loading