Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding resharding integration tests and changing ITs to not run by default #1152

Merged
merged 8 commits into from
Aug 3, 2023
Merged
14 changes: 8 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,17 @@ Please open an issue if you have any questions.
## Building from Source

After you've downloaded the code from GitHub, you can build it using Maven. To disable GPG signing in the build, use
this command: `mvn clean install -Dgpg.skip=true`. Note: This command runs Integration tests, which in turn creates AWS
resources (which requires manual cleanup). Integration tests require valid AWS credentials need to be discovered at
runtime. To skip running integration tests, add ` -DskipITs` option to the build command.
this command: `mvn clean install -Dgpg.skip=true`.
Note: This command does not run integration tests.

## Running Integration Tests

To run integration tests: `mvn -Dit.test=*IntegrationTest verify`.
This will look for a default AWS profile specified in your local `.aws/credentials`.
Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn -Dit.test=*IntegrationTest -DawsProfile="<PROFILE_NAME>" verify`.
Note that running integration tests creates AWS resources.
Integration tests require valid AWS credentials.
This will look for a default AWS profile specified in your local `.aws/credentials`.
To run all integration tests: `mvn verify -DskipITs=false`.
To run one integration tests: `mvn -Dit.test=*IntegrationTest -DskipITs=false verify`
Optionally, you can provide the name of an IAM user/role to run tests with as a string using this command: `mvn verify -DskipITs=false -DawsProfile="<PROFILE_NAME>"`.

## Integration with the Kinesis Producer Library
For producer-side developers using the **[Kinesis Producer Library (KPL)][kinesis-guide-kpl]**, the KCL integrates without additional effort. When the KCL retrieves an aggregated Amazon Kinesis record consisting of multiple KPL user records, it will automatically invoke the KPL to extract the individual user records before returning them to the user.
Expand Down
2 changes: 2 additions & 0 deletions amazon-kinesis-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
<sqlite4java.libpath>${project.build.directory}/test-lib</sqlite4java.libpath>
<slf4j.version>2.0.7</slf4j.version>
<gsr.version>1.1.14</gsr.version>
<skipITs>true</skipITs>
</properties>

<dependencies>
Expand Down Expand Up @@ -199,6 +200,7 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.2</version>
<configuration>
<skipITs>${skipITs}</skipITs>
<excludes>
<exclude>**/*IntegrationTest.java</exclude>
</excludes>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
package software.amazon.kinesis.utils;
package software.amazon.kinesis.application;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamSummaryResponse;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.ScalingType;
import software.amazon.awssdk.services.kinesis.model.UpdateShardCountRequest;
import software.amazon.awssdk.services.kinesis.model.UpdateShardCountResponse;
import software.amazon.kinesis.checkpoint.CheckpointConfig;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
Expand All @@ -19,9 +25,14 @@
import software.amazon.kinesis.metrics.MetricsConfig;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.retrieval.RetrievalConfig;
import software.amazon.kinesis.utils.LeaseTableManager;
import software.amazon.kinesis.utils.RecordValidationStatus;
import software.amazon.kinesis.utils.ReshardOptions;
import software.amazon.kinesis.utils.StreamExistenceManager;

import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -78,8 +89,10 @@ public void run() throws Exception {
try {
startConsumer();

// Sleep for three minutes to allow the producer/consumer to run and then end the test case.
Thread.sleep(TimeUnit.SECONDS.toMillis(60 * 3));
// Sleep to allow the producer/consumer to run and then end the test case.
// If non-reshard sleep 3 minutes, else sleep 4 minutes per scale.
final int sleepMinutes = (consumerConfig.getReshardFactorList() == null) ? 3 : (4 * consumerConfig.getReshardFactorList().size());
Thread.sleep(TimeUnit.MINUTES.toMillis(sleepMinutes));

// Stops sending dummy data.
stopProducer();
Expand Down Expand Up @@ -115,9 +128,25 @@ private void cleanTestResources(StreamExistenceManager streamExistenceManager, L
}

private void startProducer() {
// Send dummy data to stream
this.producerExecutor = Executors.newSingleThreadScheduledExecutor();
this.producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 60, 1, TimeUnit.SECONDS);
this.producerFuture = producerExecutor.scheduleAtFixedRate(this::publishRecord, 10, 1, TimeUnit.SECONDS);

// Reshard logic if required for the test
if (consumerConfig.getReshardFactorList() != null) {
log.info("----Reshard Config found: {}", consumerConfig.getReshardFactorList());

final StreamScaler s = new StreamScaler(
kinesisClient,
consumerConfig.getStreamName(),
consumerConfig.getReshardFactorList(),
consumerConfig
);

// Schedule the stream scales 4 minutes apart with 2 minute starting delay
for (int i = 0; i < consumerConfig.getReshardFactorList().size(); i++) {
producerExecutor.schedule(s, (4 * i) + 2, TimeUnit.MINUTES);
}
}
}

private void setUpConsumerResources() throws Exception {
Expand All @@ -128,7 +157,9 @@ private void setUpConsumerResources() throws Exception {
checkpointConfig = configsBuilder.checkpointConfig();
coordinatorConfig = configsBuilder.coordinatorConfig();
leaseManagementConfig = configsBuilder.leaseManagementConfig()
.initialPositionInStream(InitialPositionInStreamExtended.newInitialPosition(consumerConfig.getInitialPosition()))
.initialPositionInStream(
InitialPositionInStreamExtended.newInitialPosition(consumerConfig.getInitialPosition())
)
.initialLeaseTableReadCapacity(50).initialLeaseTableWriteCapacity(50);
lifecycleConfig = configsBuilder.lifecycleConfig();
processorConfig = configsBuilder.processorConfig();
Expand All @@ -152,6 +183,16 @@ private void startConsumer() {
this.consumerFuture = consumerExecutor.schedule(scheduler, 0, TimeUnit.SECONDS);
}

private void stopProducer() {
log.info("Cancelling producer and shutting down executor.");
if (producerFuture != null) {
producerFuture.cancel(false);
}
if (producerExecutor != null) {
producerExecutor.shutdown();
}
}

public void publishRecord() {
final PutRecordRequest request;
try {
Expand All @@ -175,7 +216,7 @@ public void publishRecord() {

private ByteBuffer wrapWithCounter(int payloadSize, BigInteger payloadCounter) throws RuntimeException {
final byte[] returnData;
log.info("--------------Putting record with data: {}", payloadCounter);
log.info("---------Putting record with data: {}", payloadCounter);
try {
returnData = mapper.writeValueAsBytes(payloadCounter);
} catch (Exception e) {
Expand All @@ -184,12 +225,6 @@ private ByteBuffer wrapWithCounter(int payloadSize, BigInteger payloadCounter) t
return ByteBuffer.wrap(returnData);
}

private void stopProducer() {
log.info("Cancelling producer and shutting down executor.");
producerFuture.cancel(false);
producerExecutor.shutdown();
}

private void awaitConsumerFinish() throws Exception {
Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
log.info("Waiting up to 20 seconds for shutdown to complete.");
Expand All @@ -198,7 +233,7 @@ private void awaitConsumerFinish() throws Exception {
} catch (InterruptedException e) {
log.info("Interrupted while waiting for graceful shutdown. Continuing.");
} catch (ExecutionException | TimeoutException e) {
throw e;
scheduler.shutdown();
}
log.info("Completed, shutting down now.");
}
Expand All @@ -209,15 +244,61 @@ private void validateRecordProcessor() throws Exception {
if (errorVal != RecordValidationStatus.NO_ERROR) {
throw new RuntimeException("There was an error validating the records that were processed: " + errorVal.toString());
}
log.info("--------------Completed validation of processed records.--------------");
log.info("---------Completed validation of processed records.---------");
}

private void deleteResources(StreamExistenceManager streamExistenceManager, LeaseTableManager leaseTableManager) throws Exception {
log.info("-------------Start deleting stream.----------------");
log.info("-------------Start deleting stream.---------");
streamExistenceManager.deleteResource(this.streamName);
log.info("-------------Start deleting lease table.----------------");
log.info("---------Start deleting lease table.---------");
leaseTableManager.deleteResource(this.consumerConfig.getStreamName());
log.info("-------------Finished deleting resources.----------------");
log.info("---------Finished deleting resources.---------");
}

@Data
private static class StreamScaler implements Runnable {
private final KinesisAsyncClient client;
private final String streamName;
private final List<ReshardOptions> scalingFactors;
private final KCLAppConfig consumerConfig;
private int scalingFactorIdx = 0;
private DescribeStreamSummaryRequest describeStreamSummaryRequest;

private synchronized void scaleStream() throws InterruptedException, ExecutionException {
final DescribeStreamSummaryResponse response = client.describeStreamSummary(describeStreamSummaryRequest).get();

final int openShardCount = response.streamDescriptionSummary().openShardCount();
final int targetShardCount = scalingFactors.get(scalingFactorIdx).calculateShardCount(openShardCount);

log.info("Scaling stream {} from {} shards to {} shards w/ scaling factor {}",
streamName, openShardCount, targetShardCount, scalingFactors.get(scalingFactorIdx));

final UpdateShardCountRequest updateShardCountRequest = UpdateShardCountRequest.builder()
.streamName(streamName).targetShardCount(targetShardCount).scalingType(ScalingType.UNIFORM_SCALING).build();
final UpdateShardCountResponse shardCountResponse = client.updateShardCount(updateShardCountRequest).get();
log.info("Executed shard scaling request. Response Details : {}", shardCountResponse.toString());

scalingFactorIdx++;
}

@Override
public void run() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not synchronized, it's a mutable class, and it's scheduled for execution. The only saving grace is the Executor is currently single-threaded, which is not guaranteed to remain that way (e.g., future refactor).

if (scalingFactors.size() == 0 || scalingFactorIdx >= scalingFactors.size()) {
log.info("No scaling factor found in list");
return;
}
log.info("Starting stream scaling with params : {}", this);

if (describeStreamSummaryRequest == null) {
describeStreamSummaryRequest = DescribeStreamSummaryRequest.builder().streamName(streamName).build();
}
try {
scaleStream();
} catch (InterruptedException | ExecutionException e) {
log.error("Caught error while scaling shards for stream", e);
} finally {
log.info("Reshard List State : {}", scalingFactors);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package software.amazon.kinesis.utils;
package software.amazon.kinesis.application;

import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
Expand All @@ -11,6 +11,7 @@
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.retrieval.KinesisClientRecord;
import software.amazon.kinesis.utils.RecordValidatorQueue;

import java.nio.ByteBuffer;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package software.amazon.kinesis.utils;
package software.amazon.kinesis.application;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.utils.RecordValidatorQueue;

public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.utils.RecordValidatorQueue;
import software.amazon.kinesis.utils.ReshardOptions;
import software.amazon.kinesis.utils.TestRecordProcessorFactory;
import software.amazon.kinesis.application.TestRecordProcessorFactory;
import lombok.Builder;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
Expand All @@ -29,6 +29,7 @@
import java.net.Inet4Address;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.List;

/**
* Default configuration for a producer or consumer used in integration tests.
Expand Down Expand Up @@ -75,7 +76,7 @@ public ProducerConfig getProducerConfig() {
.build();
}

public ReshardConfig getReshardConfig() {
public List<ReshardOptions> getReshardFactorList() {
return null;
}

Expand Down Expand Up @@ -157,34 +158,11 @@ public RetrievalConfig getRetrievalConfig() throws IOException, URISyntaxExcepti
*/
@Value
@Builder
static class ProducerConfig {
public static class ProducerConfig {
private boolean isBatchPut;
private int batchSize;
private int recordSizeKB;
private long callPeriodMills;
}

/**
* Description of the method of resharding for a test case
*/
@Value
@Builder
static class ReshardConfig {
/**
* reshardingFactorCycle: lists the order or reshards that will be done during one reshard cycle
* e.g {SPLIT, MERGE} means that the number of shards will first be doubled, then halved
*/
private ReshardOptions[] reshardingFactorCycle;

/**
* numReshardCycles: the number of resharding cycles that will be executed in a test
*/
private int numReshardCycles;

/**
* reshardFrequencyMillis: the period of time between reshard cycles (in milliseconds)
*/
private long reshardFrequencyMillis;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package software.amazon.kinesis.config;

import software.amazon.awssdk.http.Protocol;
import software.amazon.kinesis.utils.ReshardOptions;

import java.util.Arrays;
import java.util.List;
import java.util.UUID;

import static software.amazon.kinesis.utils.ReshardOptions.MERGE;
import static software.amazon.kinesis.utils.ReshardOptions.SPLIT;

public class ReleaseCanaryStreamingReshardingTestConfig extends KCLAppConfig {

private final UUID uniqueId = UUID.randomUUID();
@Override
public String getStreamName() {
return "KCLReleaseCanary2XStreamingReshardingTestStream_" + uniqueId;
}

@Override
public Protocol getKinesisClientProtocol() { return Protocol.HTTP2; }

@Override
public int getShardCount() {
return 100;
}

@Override
public List<ReshardOptions> getReshardFactorList() {
return Arrays.asList(SPLIT, MERGE);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import software.amazon.kinesis.config.ReleaseCanaryPollingH2TestConfig;
import software.amazon.kinesis.config.ReleaseCanaryPollingH1TestConfig;
import software.amazon.kinesis.config.ReleaseCanaryStreamingTestConfig;
import software.amazon.kinesis.utils.TestConsumer;
import software.amazon.kinesis.application.TestConsumer;

public class BasicStreamConsumerIntegrationTest {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package software.amazon.kinesis.lifecycle;

import org.junit.Test;
import software.amazon.kinesis.config.KCLAppConfig;
import software.amazon.kinesis.config.ReleaseCanaryStreamingReshardingTestConfig;
import software.amazon.kinesis.application.TestConsumer;

public class ReshardIntegrationTest {
@Test
public void kclReleaseCanaryStreamingReshardingTest() throws Exception {
KCLAppConfig consumerConfig = new ReleaseCanaryStreamingReshardingTestConfig();
TestConsumer consumer = new TestConsumer(consumerConfig);
consumer.run();
}
}
Loading