Skip to content

Commit

Permalink
[BEAM-8382] Add rate limit policy to KinesisIO.Read (apache#9765)
Browse files Browse the repository at this point in the history
  • Loading branch information
jfarr authored Mar 3, 2020
1 parent b853016 commit 873f689
Show file tree
Hide file tree
Showing 12 changed files with 419 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kinesis;

import com.amazonaws.AmazonClientException;

/** Thrown when the Kinesis client was throttled due to rate limits. */
class KinesisClientThrottledException extends TransientKinesisException {

public KinesisClientThrottledException(String s, AmazonClientException e) {
super(s, e);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
Expand Down Expand Up @@ -95,7 +96,6 @@
*
* <pre>{@code
* public class MyCustomKinesisClientProvider implements AWSClientsProvider {
* {@literal @}Override
* public AmazonKinesis getKinesisClient() {
* // set up your client here
* }
Expand Down Expand Up @@ -149,20 +149,17 @@
* this.customWatermarkPolicy = new WatermarkPolicyFactory.CustomWatermarkPolicy(WatermarkParameters.create());
* }
*
* @Override
* public Instant getWatermark() {
* return customWatermarkPolicy.getWatermark();
* }
*
* @Override
* public void update(KinesisRecord record) {
* customWatermarkPolicy.update(record);
* }
* }
*
* // custom factory
* class MyCustomPolicyFactory implements WatermarkPolicyFactory {
* @Override
* public WatermarkPolicy createWatermarkPolicy() {
* return new MyCustomPolicy();
* }
Expand All @@ -174,6 +171,69 @@
* .withCustomWatermarkPolicy(new MyCustomPolicyFactory())
* }</pre>
*
* <p>By default Kinesis IO will poll the Kinesis getRecords() API as fast as possible which may
* lead to excessive read throttling. To limit the rate of getRecords() calls you can set a rate
* limit policy. For example, the default fixed delay policy will limit the rate to one API call per
* second per shard:
*
* <pre>{@code
* p.apply(KinesisIO.read()
* .withStreamName("streamName")
* .withInitialPositionInStream(InitialPositionInStream.LATEST)
* .withFixedDelayRateLimitPolicy())
* }</pre>
*
* <p>You can also use a fixed delay policy with a specified delay interval, for example:
*
* <pre>{@code
* p.apply(KinesisIO.read()
* .withStreamName("streamName")
* .withInitialPositionInStream(InitialPositionInStream.LATEST)
* .withFixedDelayRateLimitPolicy(Duration.millis(500))
* }</pre>
*
* <p>If you need to change the polling interval of a Kinesis pipeline at runtime, for example to
* compensate for adding and removing additional consumers to the stream, then you can supply the
* delay interval as a function so that you can obtain the current delay interval from some external
* source:
*
* <pre>{@code
* p.apply(KinesisIO.read()
* .withStreamName("streamName")
* .withInitialPositionInStream(InitialPositionInStream.LATEST)
* .withDynamicDelayRateLimitPolicy(() -> Duration.millis(<some delay interval>))
* }</pre>
*
* <p>Finally, you can create a custom rate limit policy that responds to successful read calls
* and/or read throttling exceptions with your own rate-limiting logic:
*
* <pre>{@code
* // custom policy
* public class MyCustomPolicy implements RateLimitPolicy {
*
* public void onSuccess(List<KinesisRecord> records) throws InterruptedException {
* // handle successful getRecords() call
* }
*
* public void onThrottle(KinesisClientThrottledException e) throws InterruptedException {
* // handle Kinesis read throttling exception
* }
* }
*
* // custom factory
* class MyCustomPolicyFactory implements RateLimitPolicyFactory {
*
* public RateLimitPolicy getRateLimitPolicy() {
* return new MyCustomPolicy();
* }
* }
*
* p.apply(KinesisIO.read()
* .withStreamName("streamName")
* .withInitialPositionInStream(InitialPositionInStream.LATEST)
* .withCustomRateLimitPolicy(new MyCustomPolicyFactory())
* }</pre>
*
* <h3>Writing to Kinesis</h3>
*
* <p>Example usage:
Expand Down Expand Up @@ -240,6 +300,7 @@ public static Read read() {
.setMaxNumRecords(Long.MAX_VALUE)
.setUpToDateThreshold(Duration.ZERO)
.setWatermarkPolicyFactory(WatermarkPolicyFactory.withArrivalTimePolicy())
.setRateLimitPolicyFactory(RateLimitPolicyFactory.withoutLimiter())
.setMaxCapacityPerShard(ShardReadersPool.DEFAULT_CAPACITY_PER_SHARD)
.build();
}
Expand Down Expand Up @@ -274,6 +335,8 @@ public abstract static class Read extends PTransform<PBegin, PCollection<Kinesis

abstract WatermarkPolicyFactory getWatermarkPolicyFactory();

abstract RateLimitPolicyFactory getRateLimitPolicyFactory();

abstract Integer getMaxCapacityPerShard();

abstract Builder toBuilder();
Expand All @@ -297,6 +360,8 @@ abstract static class Builder {

abstract Builder setWatermarkPolicyFactory(WatermarkPolicyFactory watermarkPolicyFactory);

abstract Builder setRateLimitPolicyFactory(RateLimitPolicyFactory rateLimitPolicyFactory);

abstract Builder setMaxCapacityPerShard(Integer maxCapacity);

abstract Read build();
Expand Down Expand Up @@ -426,6 +491,45 @@ public Read withCustomWatermarkPolicy(WatermarkPolicyFactory watermarkPolicyFact
return toBuilder().setWatermarkPolicyFactory(watermarkPolicyFactory).build();
}

/** Specifies a fixed delay rate limit policy with the default delay of 1 second. */
public Read withFixedDelayRateLimitPolicy() {
return toBuilder().setRateLimitPolicyFactory(RateLimitPolicyFactory.withFixedDelay()).build();
}

/**
* Specifies a fixed delay rate limit policy with the given delay.
*
* @param delay Denotes the fixed delay duration.
*/
public Read withFixedDelayRateLimitPolicy(Duration delay) {
checkArgument(delay != null, "delay cannot be null");
return toBuilder()
.setRateLimitPolicyFactory(RateLimitPolicyFactory.withFixedDelay(delay))
.build();
}

/**
* Specifies a dynamic delay rate limit policy with the given function being called at each
* polling interval to get the next delay value. This can be used to change the polling interval
* of a running pipeline based on some external configuration source, for example.
*
* @param delay The function to invoke to get the next delay duration.
*/
public Read withDynamicDelayRateLimitPolicy(Supplier<Duration> delay) {
checkArgument(delay != null, "delay cannot be null");
return toBuilder().setRateLimitPolicyFactory(RateLimitPolicyFactory.withDelay(delay)).build();
}

/**
* Specifies the {@code RateLimitPolicyFactory} for a custom rate limiter.
*
* @param rateLimitPolicyFactory Custom rate limit policy factory.
*/
public Read withCustomRateLimitPolicy(RateLimitPolicyFactory rateLimitPolicyFactory) {
checkArgument(rateLimitPolicyFactory != null, "rateLimitPolicyFactory cannot be null");
return toBuilder().setRateLimitPolicyFactory(rateLimitPolicyFactory).build();
}

/** Specifies the maximum number of messages per one shard. */
public Read withMaxCapacityPerShard(Integer maxCapacity) {
checkArgument(maxCapacity > 0, "maxCapacity must be positive, but was: %s", maxCapacity);
Expand All @@ -442,6 +546,7 @@ public PCollection<KinesisRecord> expand(PBegin input) {
getInitialPosition(),
getUpToDateThreshold(),
getWatermarkPolicyFactory(),
getRateLimitPolicyFactory(),
getRequestRecordsLimit(),
getMaxCapacityPerShard()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
private final KinesisSource source;
private final CheckpointGenerator initialCheckpointGenerator;
private final WatermarkPolicyFactory watermarkPolicyFactory;
private final RateLimitPolicyFactory rateLimitPolicyFactory;
private final Duration upToDateThreshold;
private final Duration backlogBytesCheckThreshold;
private CustomOptional<KinesisRecord> currentRecord = CustomOptional.absent();
Expand All @@ -53,13 +54,15 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
CheckpointGenerator initialCheckpointGenerator,
KinesisSource source,
WatermarkPolicyFactory watermarkPolicyFactory,
RateLimitPolicyFactory rateLimitPolicyFactory,
Duration upToDateThreshold,
Integer maxCapacityPerShard) {
this(
kinesis,
initialCheckpointGenerator,
source,
watermarkPolicyFactory,
rateLimitPolicyFactory,
upToDateThreshold,
Duration.standardSeconds(30),
maxCapacityPerShard);
Expand All @@ -70,13 +73,15 @@ class KinesisReader extends UnboundedSource.UnboundedReader<KinesisRecord> {
CheckpointGenerator initialCheckpointGenerator,
KinesisSource source,
WatermarkPolicyFactory watermarkPolicyFactory,
RateLimitPolicyFactory rateLimitPolicyFactory,
Duration upToDateThreshold,
Duration backlogBytesCheckThreshold,
Integer maxCapacityPerShard) {
this.kinesis = checkNotNull(kinesis, "kinesis");
this.initialCheckpointGenerator =
checkNotNull(initialCheckpointGenerator, "initialCheckpointGenerator");
this.watermarkPolicyFactory = watermarkPolicyFactory;
this.rateLimitPolicyFactory = rateLimitPolicyFactory;
this.source = source;
this.upToDateThreshold = upToDateThreshold;
this.backlogBytesCheckThreshold = backlogBytesCheckThreshold;
Expand Down Expand Up @@ -185,6 +190,7 @@ ShardReadersPool createShardReadersPool() throws TransientKinesisException {
kinesis,
initialCheckpointGenerator.generate(kinesis),
watermarkPolicyFactory,
rateLimitPolicyFactory,
maxCapacityPerShard);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoi
private final String streamName;
private final Duration upToDateThreshold;
private final WatermarkPolicyFactory watermarkPolicyFactory;
private final RateLimitPolicyFactory rateLimitPolicyFactory;
private CheckpointGenerator initialCheckpointGenerator;
private final Integer limit;
private final Integer maxCapacityPerShard;
Expand All @@ -48,6 +49,7 @@ class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoi
StartingPoint startingPoint,
Duration upToDateThreshold,
WatermarkPolicyFactory watermarkPolicyFactory,
RateLimitPolicyFactory rateLimitPolicyFactory,
Integer limit,
Integer maxCapacityPerShard) {
this(
Expand All @@ -56,6 +58,7 @@ class KinesisSource extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoi
streamName,
upToDateThreshold,
watermarkPolicyFactory,
rateLimitPolicyFactory,
limit,
maxCapacityPerShard);
}
Expand All @@ -66,13 +69,15 @@ private KinesisSource(
String streamName,
Duration upToDateThreshold,
WatermarkPolicyFactory watermarkPolicyFactory,
RateLimitPolicyFactory rateLimitPolicyFactory,
Integer limit,
Integer maxCapacityPerShard) {
this.awsClientsProvider = awsClientsProvider;
this.initialCheckpointGenerator = initialCheckpoint;
this.streamName = streamName;
this.upToDateThreshold = upToDateThreshold;
this.watermarkPolicyFactory = watermarkPolicyFactory;
this.rateLimitPolicyFactory = rateLimitPolicyFactory;
this.limit = limit;
this.maxCapacityPerShard = maxCapacityPerShard;
validate();
Expand All @@ -98,6 +103,7 @@ public List<KinesisSource> split(int desiredNumSplits, PipelineOptions options)
streamName,
upToDateThreshold,
watermarkPolicyFactory,
rateLimitPolicyFactory,
limit,
maxCapacityPerShard));
}
Expand Down Expand Up @@ -126,6 +132,7 @@ public UnboundedReader<KinesisRecord> createReader(
checkpointGenerator,
this,
watermarkPolicyFactory,
rateLimitPolicyFactory,
upToDateThreshold,
maxCapacityPerShard);
}
Expand All @@ -139,6 +146,8 @@ public Coder<KinesisReaderCheckpoint> getCheckpointMarkCoder() {
public void validate() {
checkNotNull(awsClientsProvider);
checkNotNull(initialCheckpointGenerator);
checkNotNull(watermarkPolicyFactory);
checkNotNull(rateLimitPolicyFactory);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.kinesis;

import java.util.List;

public interface RateLimitPolicy {

/**
* Called after Kinesis records are successfully retrieved.
*
* @param records The list of retrieved records.
*/
default void onSuccess(List<KinesisRecord> records) throws InterruptedException {}

/**
* Called after the Kinesis client is throttled.
*
* @param e The {@code KinesisClientThrottledException} thrown by the client.
*/
default void onThrottle(KinesisClientThrottledException e) throws InterruptedException {}
}
Loading

0 comments on commit 873f689

Please sign in to comment.