Skip to content

Commit

Permalink
Merge pull request #10973: [BEAM-9476] KinesisIO retry LimitExceededE…
Browse files Browse the repository at this point in the history
…xception
  • Loading branch information
aromanenko-dev authored Apr 3, 2020
2 parents 65db744 + 48f2802 commit a259c3a
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.Minutes;

Expand All @@ -53,6 +58,9 @@ class SimplifiedKinesisClient {
private static final int PERIOD_GRANULARITY_IN_SECONDS = 60;
private static final String SUM_STATISTIC = "Sum";
private static final String STREAM_NAME_DIMENSION = "StreamName";
private static final int LIST_SHARDS_DESCRIBE_STREAM_MAX_ATTEMPTS = 10;
private static final Duration LIST_SHARDS_DESCRIBE_STREAM_INITIAL_BACKOFF =
Duration.standardSeconds(1);
private final AmazonKinesis kinesis;
private final AmazonCloudWatch cloudWatch;
private final Integer limit;
Expand Down Expand Up @@ -96,9 +104,31 @@ public List<Shard> listShards(final String streamName) throws TransientKinesisEx
List<Shard> shards = Lists.newArrayList();
String lastShardId = null;

StreamDescription description;
// DescribeStream has limits that can be hit fairly easily if we are attempting
// to configure multiple KinesisIO inputs in the same account. Retry up to
// LIST_SHARDS_DESCRIBE_STREAM_MAX_ATTEMPTS times if we end up hitting that limit.
//
// Only pass the wrapped exception up once that limit is reached. Use FluentBackoff
// to implement the retry policy.
FluentBackoff retryBackoff =
FluentBackoff.DEFAULT
.withMaxRetries(LIST_SHARDS_DESCRIBE_STREAM_MAX_ATTEMPTS)
.withInitialBackoff(LIST_SHARDS_DESCRIBE_STREAM_INITIAL_BACKOFF);
StreamDescription description = null;
do {
description = kinesis.describeStream(streamName, lastShardId).getStreamDescription();
BackOff backoff = retryBackoff.backoff();
Sleeper sleeper = Sleeper.DEFAULT;
while (true) {
try {
description =
kinesis.describeStream(streamName, lastShardId).getStreamDescription();
break;
} catch (LimitExceededException exc) {
if (!BackOffUtils.next(sleeper, backoff)) {
throw exc;
}
}
}

shards.addAll(description.getShards());
lastShardId = shards.get(shards.size() - 1).getShardId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodRequest;
import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodResult;
import com.amazonaws.services.kinesis.model.LimitExceededException;
import com.amazonaws.services.kinesis.model.ListShardsRequest;
import com.amazonaws.services.kinesis.model.ListShardsResult;
import com.amazonaws.services.kinesis.model.ListStreamConsumersRequest;
Expand Down Expand Up @@ -161,18 +162,31 @@ static class Provider implements AWSClientsProvider {
private final List<List<TestData>> shardedData;
private final int numberOfRecordsPerGet;

private int rateLimitDescribeStream = 0;

public Provider(List<List<TestData>> shardedData, int numberOfRecordsPerGet) {
this.shardedData = shardedData;
this.numberOfRecordsPerGet = numberOfRecordsPerGet;
}

/**
* Simulate an initially rate limited DescribeStream.
*
* @param rateLimitDescribeStream The number of rate limited requests before success
*/
public Provider withRateLimitedDescribeStream(int rateLimitDescribeStream) {
this.rateLimitDescribeStream = rateLimitDescribeStream;
return this;
}

@Override
public AmazonKinesis getKinesisClient() {
return new AmazonKinesisMock(
shardedData.stream()
.map(testDatas -> transform(testDatas, TestData::convertToRecord))
.collect(Collectors.toList()),
numberOfRecordsPerGet);
shardedData.stream()
.map(testDatas -> transform(testDatas, TestData::convertToRecord))
.collect(Collectors.toList()),
numberOfRecordsPerGet)
.withRateLimitedDescribeStream(rateLimitDescribeStream);
}

@Override
Expand All @@ -189,11 +203,18 @@ public IKinesisProducer createKinesisProducer(KinesisProducerConfiguration confi
private final List<List<Record>> shardedData;
private final int numberOfRecordsPerGet;

private int rateLimitDescribeStream = 0;

public AmazonKinesisMock(List<List<Record>> shardedData, int numberOfRecordsPerGet) {
this.shardedData = shardedData;
this.numberOfRecordsPerGet = numberOfRecordsPerGet;
}

public AmazonKinesisMock withRateLimitedDescribeStream(int rateLimitDescribeStream) {
this.rateLimitDescribeStream = rateLimitDescribeStream;
return this;
}

@Override
public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) {
List<String> shardIteratorParts =
Expand Down Expand Up @@ -227,6 +248,9 @@ public GetShardIteratorResult getShardIterator(GetShardIteratorRequest getShardI

@Override
public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) {
if (rateLimitDescribeStream-- > 0) {
throw new LimitExceededException("DescribeStream rate limit exceeded");
}
int nextShardId = 0;
if (exclusiveStartShardId != null) {
nextShardId = parseInt(exclusiveStartShardId) + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import java.util.List;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
Expand All @@ -39,19 +40,40 @@ public class KinesisMockReadTest {

@Rule public final transient TestPipeline p = TestPipeline.create();

private final int noOfShards = 3;
private final int noOfEventsPerShard = 100;

@Test
public void readsDataFromMockKinesis() {
int noOfShards = 3;
int noOfEventsPerShard = 100;
List<List<AmazonKinesisMock.TestData>> testData =
provideTestData(noOfShards, noOfEventsPerShard);
List<List<AmazonKinesisMock.TestData>> testData = defaultTestData();
verifyReadWithProvider(new AmazonKinesisMock.Provider(testData, 10), testData);
}

@Test
public void readsDataFromMockKinesisWithDescribeStreamRateLimit() {
List<List<AmazonKinesisMock.TestData>> testData = defaultTestData();
verifyReadWithProvider(
new AmazonKinesisMock.Provider(testData, 10).withRateLimitedDescribeStream(2), testData);
}

@Test(expected = PipelineExecutionException.class)
public void readsDataFromMockKinesisWithDescribeStreamRateLimitFailure() {
List<List<AmazonKinesisMock.TestData>> testData = defaultTestData();
// Verify with a provider that will generate more LimitExceededExceptions then we
// will retry. Should result in generation of a TransientKinesisException and subsequently
// a PipelineExecutionException.
verifyReadWithProvider(
new AmazonKinesisMock.Provider(testData, 10).withRateLimitedDescribeStream(11), testData);
}

public void verifyReadWithProvider(
AmazonKinesisMock.Provider provider, List<List<AmazonKinesisMock.TestData>> testData) {
PCollection<AmazonKinesisMock.TestData> result =
p.apply(
KinesisIO.read()
.withStreamName("stream")
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
.withAWSClientsProvider(new AmazonKinesisMock.Provider(testData, 10))
.withAWSClientsProvider(provider)
.withArrivalTimeWatermarkPolicy()
.withMaxNumRecords(noOfShards * noOfEventsPerShard))
.apply(ParDo.of(new KinesisRecordToTestData()));
Expand All @@ -67,6 +89,10 @@ public void processElement(ProcessContext c) throws Exception {
}
}

private List<List<AmazonKinesisMock.TestData>> defaultTestData() {
return provideTestData(noOfShards, noOfEventsPerShard);
}

private List<List<AmazonKinesisMock.TestData>> provideTestData(
int noOfShards, int noOfEventsPerShard) {

Expand Down

0 comments on commit a259c3a

Please sign in to comment.