Skip to content

Commit

Permalink
[BEAM-601] Run KinesisIOIT with localstack (#12422)
Browse files Browse the repository at this point in the history
* [BEAM-601] Run KinesisIOIT withtestcontainers with localstack

* [BEAM-601] Add kinesis integration test to Java postcommit

* Fixes after Alexey's code review
  • Loading branch information
Piotr Szuberski authored Aug 11, 2020
1 parent 3e7371e commit dfadde2
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 18 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ task javaPostCommit() {
dependsOn ":sdks:java:extensions:google-cloud-platform-core:postCommit"
dependsOn ":sdks:java:extensions:zetasketch:postCommit"
dependsOn ":sdks:java:io:google-cloud-platform:postCommit"
dependsOn ":sdks:java:io:kinesis:integrationTest"
}

task sqlPostCommit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ class BeamModulePlugin implements Plugin<Project> {
def quickcheck_version = "0.8"
def spark_version = "2.4.6"
def spotbugs_version = "4.0.6"
def testcontainers_localstack_version = "1.14.3"

// A map of maps containing common libraries used per language. To use:
// dependencies {
Expand Down Expand Up @@ -564,6 +565,7 @@ class BeamModulePlugin implements Plugin<Project> {
spark_sql : "org.apache.spark:spark-sql_2.11:$spark_version",
spark_streaming : "org.apache.spark:spark-streaming_2.11:$spark_version",
stax2_api : "org.codehaus.woodstox:stax2-api:3.1.4",
testcontainers_localstack : "org.testcontainers:localstack:$testcontainers_localstack_version",
vendored_bytebuddy_1_10_8 : "org.apache.beam:beam-vendor-bytebuddy-1_10_8:0.1",
vendored_grpc_1_26_0 : "org.apache.beam:beam-vendor-grpc-1_26_0:0.3",
vendored_guava_26_0_jre : "org.apache.beam:beam-vendor-guava-26_0-jre:0.1",
Expand Down
2 changes: 1 addition & 1 deletion sdks/java/io/amazon-web-services/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ dependencies {
testCompile library.java.hamcrest_library
testCompile library.java.mockito_core
testCompile library.java.junit
testCompile library.java.testcontainers_localstack
testCompile "org.assertj:assertj-core:3.11.1"
testCompile 'org.elasticmq:elasticmq-rest-sqs_2.12:0.14.1'
testCompile 'org.testcontainers:localstack:1.11.2'
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
}
Expand Down
2 changes: 1 addition & 1 deletion sdks/java/io/amazon-web-services2/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ dependencies {
testCompile library.java.hamcrest_library
testCompile library.java.powermock
testCompile library.java.powermock_mockito
testCompile library.java.testcontainers_localstack
testCompile "org.assertj:assertj-core:3.11.1"
testCompile 'org.testcontainers:testcontainers:1.11.3'
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
}
Expand Down
1 change: 1 addition & 0 deletions sdks/java/io/kinesis/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ dependencies {
testCompile library.java.hamcrest_library
testCompile library.java.powermock
testCompile library.java.powermock_mockito
testCompile library.java.testcontainers_localstack
testCompile "org.assertj:assertj-core:3.11.1"
testRuntimeOnly library.java.slf4j_jdk14
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.amazonaws.services.kinesis.producer.IKinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import java.net.URI;
import org.checkerframework.checker.nullness.qual.Nullable;

/** Basic implementation of {@link AWSClientsProvider} used by default in {@link KinesisIO}. */
Expand All @@ -39,16 +40,27 @@ class BasicKinesisProvider implements AWSClientsProvider {
private final String secretKey;
private final Regions region;
private final @Nullable String serviceEndpoint;
private final boolean verifyCertificate;

BasicKinesisProvider(
String accessKey, String secretKey, Regions region, @Nullable String serviceEndpoint) {
String accessKey,
String secretKey,
Regions region,
@Nullable String serviceEndpoint,
boolean verifyCertificate) {
checkArgument(accessKey != null, "accessKey can not be null");
checkArgument(secretKey != null, "secretKey can not be null");
checkArgument(region != null, "region can not be null");
this.accessKey = accessKey;
this.secretKey = secretKey;
this.region = region;
this.serviceEndpoint = serviceEndpoint;
this.verifyCertificate = verifyCertificate;
}

BasicKinesisProvider(
String accessKey, String secretKey, Regions region, @Nullable String serviceEndpoint) {
this(accessKey, secretKey, region, serviceEndpoint, true);
}

private AWSCredentialsProvider getCredentialsProvider() {
Expand Down Expand Up @@ -85,6 +97,12 @@ public AmazonCloudWatch getCloudWatchClient() {
public IKinesisProducer createKinesisProducer(KinesisProducerConfiguration config) {
config.setRegion(region.getName());
config.setCredentialsProvider(getCredentialsProvider());
if (serviceEndpoint != null) {
URI uri = URI.create(serviceEndpoint);
config.setKinesisEndpoint(uri.getHost());
config.setKinesisPort(uri.getPort());
}
config.setVerifyCertificate(verifyCertificate);
return new KinesisProducer(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,28 @@ public Read withAWSClientsProvider(
new BasicKinesisProvider(awsAccessKey, awsSecretKey, region, serviceEndpoint));
}

/**
* Specify credential details and region to be used to read from Kinesis. If you need more
* sophisticated credential protocol, then you should look at {@link
* Read#withAWSClientsProvider(AWSClientsProvider)}.
*
* <p>The {@code serviceEndpoint} sets an alternative service host. This is useful to execute
* the tests with Kinesis service emulator.
*
* <p>The {@code verifyCertificate} disables or enables certificate verification. Never set it
* to false in production.
*/
public Read withAWSClientsProvider(
String awsAccessKey,
String awsSecretKey,
Regions region,
String serviceEndpoint,
boolean verifyCertificate) {
return withAWSClientsProvider(
new BasicKinesisProvider(
awsAccessKey, awsSecretKey, region, serviceEndpoint, verifyCertificate));
}

/** Specifies to read at most a given number of records. */
public Read withMaxNumRecords(long maxNumRecords) {
checkArgument(
Expand Down Expand Up @@ -670,6 +692,28 @@ public Write withAWSClientsProvider(
new BasicKinesisProvider(awsAccessKey, awsSecretKey, region, serviceEndpoint));
}

/**
* Specify credential details and region to be used to write to Kinesis. If you need more
* sophisticated credential protocol, then you should look at {@link
* Write#withAWSClientsProvider(AWSClientsProvider)}.
*
* <p>The {@code serviceEndpoint} sets an alternative service host. This is useful to execute
* the tests with Kinesis service emulator.
*
* <p>The {@code verifyCertificate} disables or enables certificate verification. Never set it
* to false in production.
*/
public Write withAWSClientsProvider(
String awsAccessKey,
String awsSecretKey,
Regions region,
String serviceEndpoint,
boolean verifyCertificate) {
return withAWSClientsProvider(
new BasicKinesisProvider(
awsAccessKey, awsSecretKey, region, serviceEndpoint, verifyCertificate));
}

/**
* Specify the number of retries that will be used to flush the outstanding records in case if
* they were not flushed from the first time. Default number of retries is {@code
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@
*/
package org.apache.beam.sdk.io.kinesis;

import com.amazonaws.SDKGlobalConfiguration;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
Expand All @@ -35,33 +42,51 @@
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.testcontainers.containers.localstack.LocalStackContainer;

/**
* Integration test, that writes and reads data to and from real Kinesis. You need to provide {@link
* KinesisTestOptions} in order to run this.
* KinesisTestOptions} in order to run this if you want to test it with production setup. By default
* when no options are provided an instance of localstack is used.
*/
@RunWith(JUnit4.class)
public class KinesisIOIT implements Serializable {
private static int numberOfShards;
private static int numberOfRows;
private static final String LOCALSTACK_VERSION = "0.11.3";

@Rule public TestPipeline pipelineWrite = TestPipeline.create();
@Rule public TestPipeline pipelineRead = TestPipeline.create();

private static KinesisTestOptions options;
private static final Instant now = Instant.now();

private static AmazonKinesis kinesisClient;
private static LocalStackContainer localstackContainer;
private static Instant now = Instant.now();

@BeforeClass
public static void setup() {
public static void setup() throws Exception {
PipelineOptionsFactory.register(KinesisTestOptions.class);
options = TestPipeline.testingPipelineOptions().as(KinesisTestOptions.class);
numberOfShards = options.getNumberOfShards();
numberOfRows = options.getNumberOfRecords();
if (options.getUseLocalstack()) {
setupLocalstack();
kinesisClient = createKinesisClient();
createStream(options.getAwsKinesisStream());
}
}

@AfterClass
public static void teardown() {
if (options.getUseLocalstack()) {
kinesisClient.deleteStream(options.getAwsKinesisStream());
System.clearProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY);
System.clearProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY);
localstackContainer.stop();
}
}

/** Test which write and then read data for a Kinesis stream. */
Expand All @@ -74,7 +99,7 @@ public void testWriteThenRead() {
/** Write test dataset into Kinesis stream. */
private void runWrite() {
pipelineWrite
.apply("Generate Sequence", GenerateSequence.from(0).to((long) numberOfRows))
.apply("Generate Sequence", GenerateSequence.from(0).to(options.getNumberOfRecords()))
.apply("Prepare TestRows", ParDo.of(new TestRow.DeterministicallyConstructTestRowFn()))
.apply("Prepare Kinesis input records", ParDo.of(new ConvertToBytes()))
.apply(
Expand All @@ -85,7 +110,9 @@ private void runWrite() {
.withAWSClientsProvider(
options.getAwsAccessKey(),
options.getAwsSecretKey(),
Regions.fromName(options.getAwsKinesisRegion())));
Regions.fromName(options.getAwsKinesisRegion()),
options.getAwsServiceEndpoint(),
options.getAwsVerifyCertificate()));

pipelineWrite.run().waitUntilFinish();
}
Expand All @@ -99,28 +126,100 @@ private void runRead() {
.withAWSClientsProvider(
options.getAwsAccessKey(),
options.getAwsSecretKey(),
Regions.fromName(options.getAwsKinesisRegion()))
.withMaxNumRecords(numberOfRows)
Regions.fromName(options.getAwsKinesisRegion()),
options.getAwsServiceEndpoint(),
options.getAwsVerifyCertificate())
.withMaxNumRecords(options.getNumberOfRecords())
// to prevent endless running in case of error
.withMaxReadTime(Duration.standardMinutes(10))
.withMaxReadTime(Duration.standardMinutes(10L))
.withInitialPositionInStream(InitialPositionInStream.AT_TIMESTAMP)
.withInitialTimestampInStream(now)
.withRequestRecordsLimit(1000));

PAssert.thatSingleton(output.apply("Count All", Count.globally()))
.isEqualTo((long) numberOfRows);
.isEqualTo((long) options.getNumberOfRecords());

PCollection<String> consolidatedHashcode =
output
.apply(ParDo.of(new ExtractDataValues()))
.apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults());

PAssert.that(consolidatedHashcode)
.containsInAnyOrder(TestRow.getExpectedHashForRowCount(numberOfRows));
.containsInAnyOrder(TestRow.getExpectedHashForRowCount(options.getNumberOfRecords()));

pipelineRead.run().waitUntilFinish();
}

/** Necessary setup for localstack environment. */
private static void setupLocalstack() {
// For some unclear reason localstack requires a timestamp in seconds
now = Instant.ofEpochMilli(Long.divideUnsigned(now.getMillis(), 1000L));

System.setProperty(SDKGlobalConfiguration.DISABLE_CERT_CHECKING_SYSTEM_PROPERTY, "true");
System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true");

localstackContainer =
new LocalStackContainer(LOCALSTACK_VERSION)
.withServices(LocalStackContainer.Service.KINESIS)
.withEnv("USE_SSL", "true")
.withStartupAttempts(3);
localstackContainer.start();

options.setAwsServiceEndpoint(
localstackContainer
.getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
.getServiceEndpoint()
.replace("http", "https"));
options.setAwsKinesisRegion(
localstackContainer
.getEndpointConfiguration(LocalStackContainer.Service.KINESIS)
.getSigningRegion());
options.setAwsAccessKey(
localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSAccessKeyId());
options.setAwsSecretKey(
localstackContainer.getDefaultCredentialsProvider().getCredentials().getAWSSecretKey());
options.setNumberOfRecords(1000);
options.setNumberOfShards(1);
options.setAwsKinesisStream("beam_kinesis_test");
options.setAwsVerifyCertificate(false);
}

private static AmazonKinesis createKinesisClient() {
AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();

AWSCredentialsProvider credentialsProvider =
new AWSStaticCredentialsProvider(
new BasicAWSCredentials(options.getAwsAccessKey(), options.getAwsSecretKey()));
clientBuilder.setCredentials(credentialsProvider);

if (options.getAwsServiceEndpoint() != null) {
AwsClientBuilder.EndpointConfiguration endpointConfiguration =
new AwsClientBuilder.EndpointConfiguration(
options.getAwsServiceEndpoint(), options.getAwsKinesisRegion());
clientBuilder.setEndpointConfiguration(endpointConfiguration);
} else {
clientBuilder.setRegion(options.getAwsKinesisRegion());
}

return clientBuilder.build();
}

private static void createStream(String streamName) throws Exception {
kinesisClient.createStream(streamName, 1);
int repeats = 10;
for (int i = 0; i <= repeats; ++i) {
String streamStatus =
kinesisClient.describeStream(streamName).getStreamDescription().getStreamStatus();
if ("ACTIVE".equals(streamStatus)) {
break;
}
if (i == repeats) {
throw new RuntimeException("Unable to initialize stream");
}
Thread.sleep(1000L);
}
}

/** Produces test rows. */
private static class ConvertToBytes extends DoFn<TestRow, byte[]> {
@ProcessElement
Expand All @@ -141,7 +240,7 @@ private static final class RandomPartitioner implements KinesisPartitioner {
@Override
public String getPartitionKey(byte[] value) {
Random rand = new Random();
int n = rand.nextInt(numberOfShards) + 1;
int n = rand.nextInt(options.getNumberOfShards()) + 1;
return String.valueOf(n);
}

Expand Down
Loading

0 comments on commit dfadde2

Please sign in to comment.