diff --git a/README.md b/README.md index 3cc927990..b6ebb6a17 100644 --- a/README.md +++ b/README.md @@ -32,15 +32,17 @@ Please open an issue if you have any questions. ## Building from Source After you've downloaded the code from GitHub, you can build it using Maven. To disable GPG signing in the build, use - this command: `mvn clean install -Dgpg.skip=true`. Note: This command runs Integration tests, which in turn creates AWS - resources (which requires manual cleanup). Integration tests require valid AWS credentials need to be discovered at - runtime. To skip running integration tests, add ` -DskipITs` option to the build command. +this command: `mvn clean install -Dgpg.skip=true`. +Note: This command does not run integration tests. ## Running Integration Tests -To run integration tests: `mvn -Dit.test=*IntegrationTest verify`. -This will look for a default AWS profile specified in your local `.aws/credentials`. -Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn -Dit.test=*IntegrationTest -DawsProfile="" verify`. +Note that running integration tests creates AWS resources. +Integration tests require valid AWS credentials. +This will look for a default AWS profile specified in your local `.aws/credentials`. +To run all integration tests: `mvn verify -DskipITs=false`. +To run one integration tests: `mvn -Dit.test=*IntegrationTest -DskipITs=false verify` +Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn verify -DskipITs=false -DawsProfile=""`. ## Integration with the Kinesis Producer Library For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user. diff --git a/amazon-kinesis-client/pom.xml b/amazon-kinesis-client/pom.xml index 456811782..653d581fb 100644 --- a/amazon-kinesis-client/pom.xml +++ b/amazon-kinesis-client/pom.xml @@ -52,6 +52,7 @@ ${project.build.directory}/test-lib 2.0.7 1.1.14 + true @@ -199,6 +200,7 @@ maven-surefire-plugin 2.22.2 + ${skipITs} **/*IntegrationTest.java diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestConsumer.java similarity index 64% rename from amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestConsumer.java index 223ca99a1..3e4e931da 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestConsumer.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestConsumer.java @@ -1,13 +1,19 @@ -package software.amazon.kinesis.utils; +package software.amazon.kinesis.application; import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest; +import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; +import software.amazon.awssdk.services.kinesis.model.ScalingType; +import software.amazon.awssdk.services.kinesis.model.UpdateShardCountRequest; +import software.amazon.awssdk.services.kinesis.model.UpdateShardCountResponse; import software.amazon.kinesis.checkpoint.CheckpointConfig; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -19,9 +25,14 @@ import software.amazon.kinesis.metrics.MetricsConfig; import software.amazon.kinesis.processor.ProcessorConfig; import software.amazon.kinesis.retrieval.RetrievalConfig; +import software.amazon.kinesis.utils.LeaseTableManager; +import software.amazon.kinesis.utils.RecordValidationStatus; +import software.amazon.kinesis.utils.ReshardOptions; +import software.amazon.kinesis.utils.StreamExistenceManager; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -78,8 +89,10 @@ public void run() throws Exception { try { startConsumer(); - // Sleep for three minutes to allow the producer/consumer to run and then end the test case. - Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 3)); + // Sleep to allow the producer/consumer to run and then end the test case. + // If non-reshard sleep 3 minutes, else sleep 4 minutes per scale. + final int sleepMinutes = (consumerConfig.getReshardFactorList() == null) ? 3 : (4 * consumerConfig.getReshardFactorList().size()); + Thread.sleep(TimeUnit.MINUTES.toMillis(sleepMinutes)); // Stops sending dummy data. stopProducer(); @@ -115,9 +128,25 @@ private void cleanTestResources(StreamExistenceManager streamExistenceManager, L } private void startProducer() { - // Send dummy data to stream this.producerExecutor = Executors.newSingleThreadScheduledExecutor(); - this.producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 60, 1, TimeUnit.SECONDS); + this.producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS); + + // Reshard logic if required for the test + if (consumerConfig.getReshardFactorList() != null) { + log.info("----Reshard Config found: {}", consumerConfig.getReshardFactorList()); + + final StreamScaler s = new StreamScaler( + kinesisClient, + consumerConfig.getStreamName(), + consumerConfig.getReshardFactorList(), + consumerConfig + ); + + // Schedule the stream scales 4 minutes apart with 2 minute starting delay + for (int i = 0; i < consumerConfig.getReshardFactorList().size(); i++) { + producerExecutor.schedule(s, (4 * i) + 2, TimeUnit.MINUTES); + } + } } private void setUpConsumerResources() throws Exception { @@ -128,7 +157,9 @@ private void setUpConsumerResources() throws Exception { checkpointConfig = configsBuilder.checkpointConfig(); coordinatorConfig = configsBuilder.coordinatorConfig(); leaseManagementConfig = configsBuilder.leaseManagementConfig() - .initialPositionInStream(InitialPositionInStreamExtended.newInitialPosition(consumerConfig.getInitialPosition())) + .initialPositionInStream( + InitialPositionInStreamExtended.newInitialPosition(consumerConfig.getInitialPosition()) + ) .initialLeaseTableReadCapacity(50).initialLeaseTableWriteCapacity(50); lifecycleConfig = configsBuilder.lifecycleConfig(); processorConfig = configsBuilder.processorConfig(); @@ -152,6 +183,16 @@ private void startConsumer() { this.consumerFuture = consumerExecutor.schedule(scheduler, 0, TimeUnit.SECONDS); } + private void stopProducer() { + log.info("Cancelling producer and shutting down executor."); + if (producerFuture != null) { + producerFuture.cancel(false); + } + if (producerExecutor != null) { + producerExecutor.shutdown(); + } + } + public void publishRecord() { final PutRecordRequest request; try { @@ -175,7 +216,7 @@ public void publishRecord() { private ByteBuffer wrapWithCounter(int payloadSize, BigInteger payloadCounter) throws RuntimeException { final byte[] returnData; - log.info("--------------Putting record with data: {}", payloadCounter); + log.info("---------Putting record with data: {}", payloadCounter); try { returnData = mapper.writeValueAsBytes(payloadCounter); } catch (Exception e) { @@ -184,12 +225,6 @@ private ByteBuffer wrapWithCounter(int payloadSize, BigInteger payloadCounter) t return ByteBuffer.wrap(returnData); } - private void stopProducer() { - log.info("Cancelling producer and shutting down executor."); - producerFuture.cancel(false); - producerExecutor.shutdown(); - } - private void awaitConsumerFinish() throws Exception { Future gracefulShutdownFuture = scheduler.startGracefulShutdown(); log.info("Waiting up to 20 seconds for shutdown to complete."); @@ -198,7 +233,7 @@ private void awaitConsumerFinish() throws Exception { } catch (InterruptedException e) { log.info("Interrupted while waiting for graceful shutdown. Continuing."); } catch (ExecutionException | TimeoutException e) { - throw e; + scheduler.shutdown(); } log.info("Completed, shutting down now."); } @@ -209,15 +244,61 @@ private void validateRecordProcessor() throws Exception { if (errorVal != RecordValidationStatus.NO_ERROR) { throw new RuntimeException("There was an error validating the records that were processed: " + errorVal.toString()); } - log.info("--------------Completed validation of processed records.--------------"); + log.info("---------Completed validation of processed records.---------"); } private void deleteResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception { - log.info("-------------Start deleting stream.----------------"); + log.info("-------------Start deleting stream.---------"); streamExistenceManager.deleteResource(this.streamName); - log.info("-------------Start deleting lease table.----------------"); + log.info("---------Start deleting lease table.---------"); leaseTableManager.deleteResource(this.consumerConfig.getStreamName()); - log.info("-------------Finished deleting resources.----------------"); + log.info("---------Finished deleting resources.---------"); } + @Data + private static class StreamScaler implements Runnable { + private final KinesisAsyncClient client; + private final String streamName; + private final List scalingFactors; + private final KCLAppConfig consumerConfig; + private int scalingFactorIdx = 0; + private DescribeStreamSummaryRequest describeStreamSummaryRequest; + + private synchronized void scaleStream() throws InterruptedException, ExecutionException { + final DescribeStreamSummaryResponse response = client.describeStreamSummary(describeStreamSummaryRequest).get(); + + final int openShardCount = response.streamDescriptionSummary().openShardCount(); + final int targetShardCount = scalingFactors.get(scalingFactorIdx).calculateShardCount(openShardCount); + + log.info("Scaling stream {} from {} shards to {} shards w/ scaling factor {}", + streamName, openShardCount, targetShardCount, scalingFactors.get(scalingFactorIdx)); + + final UpdateShardCountRequest updateShardCountRequest = UpdateShardCountRequest.builder() + .streamName(streamName).targetShardCount(targetShardCount).scalingType(ScalingType.UNIFORM_SCALING).build(); + final UpdateShardCountResponse shardCountResponse = client.updateShardCount(updateShardCountRequest).get(); + log.info("Executed shard scaling request. Response Details : {}", shardCountResponse.toString()); + + scalingFactorIdx++; + } + + @Override + public void run() { + if (scalingFactors.size() == 0 || scalingFactorIdx >= scalingFactors.size()) { + log.info("No scaling factor found in list"); + return; + } + log.info("Starting stream scaling with params : {}", this); + + if (describeStreamSummaryRequest == null) { + describeStreamSummaryRequest = DescribeStreamSummaryRequest.builder().streamName(streamName).build(); + } + try { + scaleStream(); + } catch (InterruptedException | ExecutionException e) { + log.error("Caught error while scaling shards for stream", e); + } finally { + log.info("Reshard List State : {}", scalingFactors); + } + } + } } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessor.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessor.java similarity index 97% rename from amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessor.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessor.java index f3e439156..0e4dc489d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessor.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessor.java @@ -1,4 +1,4 @@ -package software.amazon.kinesis.utils; +package software.amazon.kinesis.application; import lombok.extern.slf4j.Slf4j; import org.slf4j.MDC; @@ -11,6 +11,7 @@ import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.retrieval.KinesisClientRecord; +import software.amazon.kinesis.utils.RecordValidatorQueue; import java.nio.ByteBuffer; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessorFactory.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessorFactory.java similarity index 84% rename from amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessorFactory.java rename to amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessorFactory.java index 03361b6e5..4e06890e5 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/TestRecordProcessorFactory.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/application/TestRecordProcessorFactory.java @@ -1,7 +1,8 @@ -package software.amazon.kinesis.utils; +package software.amazon.kinesis.application; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; +import software.amazon.kinesis.utils.RecordValidatorQueue; public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java index 5365ca4f3..b5d0c4d11 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/KCLAppConfig.java @@ -5,7 +5,7 @@ import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.utils.RecordValidatorQueue; import software.amazon.kinesis.utils.ReshardOptions; -import software.amazon.kinesis.utils.TestRecordProcessorFactory; +import software.amazon.kinesis.application.TestRecordProcessorFactory; import lombok.Builder; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider; @@ -29,6 +29,7 @@ import java.net.Inet4Address; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.util.List; /** * Default configuration for a producer or consumer used in integration tests. @@ -75,7 +76,7 @@ public ProducerConfig getProducerConfig() { .build(); } - public ReshardConfig getReshardConfig() { + public List getReshardFactorList() { return null; } @@ -157,34 +158,11 @@ public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxExcepti */ @Value @Builder - static class ProducerConfig { + public static class ProducerConfig { private boolean isBatchPut; private int batchSize; private int recordSizeKB; private long callPeriodMills; } - /** - * Description of the method of resharding for a test case - */ - @Value - @Builder - static class ReshardConfig { - /** - * reshardingFactorCycle: lists the order or reshards that will be done during one reshard cycle - * e.g {SPLIT, MERGE} means that the number of shards will first be doubled, then halved - */ - private ReshardOptions[] reshardingFactorCycle; - - /** - * numReshardCycles: the number of resharding cycles that will be executed in a test - */ - private int numReshardCycles; - - /** - * reshardFrequencyMillis: the period of time between reshard cycles (in milliseconds) - */ - private long reshardFrequencyMillis; - } - } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingReshardingTestConfig.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingReshardingTestConfig.java new file mode 100644 index 000000000..cfdc5298e --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/config/ReleaseCanaryStreamingReshardingTestConfig.java @@ -0,0 +1,34 @@ +package software.amazon.kinesis.config; + +import software.amazon.awssdk.http.Protocol; +import software.amazon.kinesis.utils.ReshardOptions; + +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +import static software.amazon.kinesis.utils.ReshardOptions.MERGE; +import static software.amazon.kinesis.utils.ReshardOptions.SPLIT; + +public class ReleaseCanaryStreamingReshardingTestConfig extends KCLAppConfig { + + private final UUID uniqueId = UUID.randomUUID(); + @Override + public String getStreamName() { + return "KCLReleaseCanary2XStreamingReshardingTestStream_" + uniqueId; + } + + @Override + public Protocol getKinesisClientProtocol() { return Protocol.HTTP2; } + + @Override + public int getShardCount() { + return 100; + } + + @Override + public List getReshardFactorList() { + return Arrays.asList(SPLIT, MERGE); + } + +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamConsumerIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamConsumerIntegrationTest.java index e2e446872..d03254c29 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamConsumerIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/BasicStreamConsumerIntegrationTest.java @@ -5,7 +5,7 @@ import software.amazon.kinesis.config.ReleaseCanaryPollingH2TestConfig; import software.amazon.kinesis.config.ReleaseCanaryPollingH1TestConfig; import software.amazon.kinesis.config.ReleaseCanaryStreamingTestConfig; -import software.amazon.kinesis.utils.TestConsumer; +import software.amazon.kinesis.application.TestConsumer; public class BasicStreamConsumerIntegrationTest { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ReshardIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ReshardIntegrationTest.java new file mode 100644 index 000000000..aa08980e0 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ReshardIntegrationTest.java @@ -0,0 +1,15 @@ +package software.amazon.kinesis.lifecycle; + +import org.junit.Test; +import software.amazon.kinesis.config.KCLAppConfig; +import software.amazon.kinesis.config.ReleaseCanaryStreamingReshardingTestConfig; +import software.amazon.kinesis.application.TestConsumer; + +public class ReshardIntegrationTest { + @Test + public void kclReleaseCanaryStreamingReshardingTest() throws Exception { + KCLAppConfig consumerConfig = new ReleaseCanaryStreamingReshardingTestConfig(); + TestConsumer consumer = new TestConsumer(consumerConfig); + consumer.run(); + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/ReshardOptions.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/ReshardOptions.java index fbf5f68b1..f1513cfb5 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/ReshardOptions.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/ReshardOptions.java @@ -6,6 +6,16 @@ * Merge halves the number of shards. */ public enum ReshardOptions { - SPLIT, - MERGE + SPLIT { + public int calculateShardCount(int currentShards) { + return (int) (2.0 * currentShards); + } + }, + MERGE { + public int calculateShardCount(int currentShards) { + return (int) (0.5 * currentShards); + } + }; + + public abstract int calculateShardCount(int currentShards); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java index b5f06b786..db8615c35 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/utils/StreamExistenceManager.java @@ -111,5 +111,4 @@ private void createStream(String streamName, int shardCount) { } } } - }