Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-9476] KinesisIO retry LimitExceededException #10973

Merged
merged 1 commit into from
Apr 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.BackOffUtils;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.Minutes;

Expand All @@ -53,6 +58,9 @@ class SimplifiedKinesisClient {
private static final int PERIOD_GRANULARITY_IN_SECONDS = 60;
private static final String SUM_STATISTIC = "Sum";
private static final String STREAM_NAME_DIMENSION = "StreamName";
private static final int LIST_SHARDS_DESCRIBE_STREAM_MAX_ATTEMPTS = 10;
private static final Duration LIST_SHARDS_DESCRIBE_STREAM_INITIAL_BACKOFF =
Duration.standardSeconds(1);
private final AmazonKinesis kinesis;
private final AmazonCloudWatch cloudWatch;
private final Integer limit;
Expand Down Expand Up @@ -96,9 +104,31 @@ public List<Shard> listShards(final String streamName) throws TransientKinesisEx
List<Shard> shards = Lists.newArrayList();
String lastShardId = null;

StreamDescription description;
// DescribeStream has limits that can be hit fairly easily if we are attempting
// to configure multiple KinesisIO inputs in the same account. Retry up to
// LIST_SHARDS_DESCRIBE_STREAM_MAX_ATTEMPTS times if we end up hitting that limit.
//
// Only pass the wrapped exception up once that limit is reached. Use FluentBackoff
// to implement the retry policy.
FluentBackoff retryBackoff =
FluentBackoff.DEFAULT
.withMaxRetries(LIST_SHARDS_DESCRIBE_STREAM_MAX_ATTEMPTS)
.withInitialBackoff(LIST_SHARDS_DESCRIBE_STREAM_INITIAL_BACKOFF);
StreamDescription description = null;
do {
description = kinesis.describeStream(streamName, lastShardId).getStreamDescription();
BackOff backoff = retryBackoff.backoff();
Sleeper sleeper = Sleeper.DEFAULT;
while (true) {
try {
description =
kinesis.describeStream(streamName, lastShardId).getStreamDescription();
break;
} catch (LimitExceededException exc) {
if (!BackOffUtils.next(sleeper, backoff)) {
throw exc;
}
}
}

shards.addAll(description.getShards());
lastShardId = shards.get(shards.size() - 1).getShardId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodRequest;
import com.amazonaws.services.kinesis.model.IncreaseStreamRetentionPeriodResult;
import com.amazonaws.services.kinesis.model.LimitExceededException;
import com.amazonaws.services.kinesis.model.ListShardsRequest;
import com.amazonaws.services.kinesis.model.ListShardsResult;
import com.amazonaws.services.kinesis.model.ListStreamConsumersRequest;
Expand Down Expand Up @@ -161,18 +162,31 @@ static class Provider implements AWSClientsProvider {
private final List<List<TestData>> shardedData;
private final int numberOfRecordsPerGet;

private int rateLimitDescribeStream = 0;

public Provider(List<List<TestData>> shardedData, int numberOfRecordsPerGet) {
this.shardedData = shardedData;
this.numberOfRecordsPerGet = numberOfRecordsPerGet;
}

/**
* Simulate an initially rate limited DescribeStream.
*
* @param rateLimitDescribeStream The number of rate limited requests before success
*/
public Provider withRateLimitedDescribeStream(int rateLimitDescribeStream) {
this.rateLimitDescribeStream = rateLimitDescribeStream;
return this;
}

@Override
public AmazonKinesis getKinesisClient() {
return new AmazonKinesisMock(
shardedData.stream()
.map(testDatas -> transform(testDatas, TestData::convertToRecord))
.collect(Collectors.toList()),
numberOfRecordsPerGet);
shardedData.stream()
.map(testDatas -> transform(testDatas, TestData::convertToRecord))
.collect(Collectors.toList()),
numberOfRecordsPerGet)
.withRateLimitedDescribeStream(rateLimitDescribeStream);
}

@Override
Expand All @@ -189,11 +203,18 @@ public IKinesisProducer createKinesisProducer(KinesisProducerConfiguration confi
private final List<List<Record>> shardedData;
private final int numberOfRecordsPerGet;

private int rateLimitDescribeStream = 0;

public AmazonKinesisMock(List<List<Record>> shardedData, int numberOfRecordsPerGet) {
this.shardedData = shardedData;
this.numberOfRecordsPerGet = numberOfRecordsPerGet;
}

public AmazonKinesisMock withRateLimitedDescribeStream(int rateLimitDescribeStream) {
this.rateLimitDescribeStream = rateLimitDescribeStream;
return this;
}

@Override
public GetRecordsResult getRecords(GetRecordsRequest getRecordsRequest) {
List<String> shardIteratorParts =
Expand Down Expand Up @@ -227,6 +248,9 @@ public GetShardIteratorResult getShardIterator(GetShardIteratorRequest getShardI

@Override
public DescribeStreamResult describeStream(String streamName, String exclusiveStartShardId) {
if (rateLimitDescribeStream-- > 0) {
throw new LimitExceededException("DescribeStream rate limit exceeded");
}
int nextShardId = 0;
if (exclusiveStartShardId != null) {
nextShardId = parseInt(exclusiveStartShardId) + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import java.util.List;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
Expand All @@ -39,19 +40,40 @@ public class KinesisMockReadTest {

@Rule public final transient TestPipeline p = TestPipeline.create();

private final int noOfShards = 3;
private final int noOfEventsPerShard = 100;

@Test
public void readsDataFromMockKinesis() {
int noOfShards = 3;
int noOfEventsPerShard = 100;
List<List<AmazonKinesisMock.TestData>> testData =
provideTestData(noOfShards, noOfEventsPerShard);
List<List<AmazonKinesisMock.TestData>> testData = defaultTestData();
verifyReadWithProvider(new AmazonKinesisMock.Provider(testData, 10), testData);
}

@Test
public void readsDataFromMockKinesisWithDescribeStreamRateLimit() {
List<List<AmazonKinesisMock.TestData>> testData = defaultTestData();
verifyReadWithProvider(
new AmazonKinesisMock.Provider(testData, 10).withRateLimitedDescribeStream(2), testData);
}

@Test(expected = PipelineExecutionException.class)
public void readsDataFromMockKinesisWithDescribeStreamRateLimitFailure() {
List<List<AmazonKinesisMock.TestData>> testData = defaultTestData();
// Verify with a provider that will generate more LimitExceededExceptions then we
// will retry. Should result in generation of a TransientKinesisException and subsequently
// a PipelineExecutionException.
verifyReadWithProvider(
new AmazonKinesisMock.Provider(testData, 10).withRateLimitedDescribeStream(11), testData);
}

public void verifyReadWithProvider(
AmazonKinesisMock.Provider provider, List<List<AmazonKinesisMock.TestData>> testData) {
PCollection<AmazonKinesisMock.TestData> result =
p.apply(
KinesisIO.read()
.withStreamName("stream")
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON)
.withAWSClientsProvider(new AmazonKinesisMock.Provider(testData, 10))
.withAWSClientsProvider(provider)
.withArrivalTimeWatermarkPolicy()
.withMaxNumRecords(noOfShards * noOfEventsPerShard))
.apply(ParDo.of(new KinesisRecordToTestData()));
Expand All @@ -67,6 +89,10 @@ public void processElement(ProcessContext c) throws Exception {
}
}

private List<List<AmazonKinesisMock.TestData>> defaultTestData() {
return provideTestData(noOfShards, noOfEventsPerShard);
}

private List<List<AmazonKinesisMock.TestData>> provideTestData(
int noOfShards, int noOfEventsPerShard) {

Expand Down