Skip to content

Commit

Permalink
[BEAM-10137] Add KinesisIO for cross-language usage with python wrapp…
Browse files Browse the repository at this point in the history
…er (#12297)
  • Loading branch information
Piotr Szuberski authored Sep 2, 2020
1 parent ab597ff commit 0335ba5
Show file tree
Hide file tree
Showing 7 changed files with 1,032 additions and 44 deletions.
38 changes: 38 additions & 0 deletions sdks/java/io/kinesis/expansion-service/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

apply plugin: 'org.apache.beam.module'
apply plugin: 'application'
mainClassName = "org.apache.beam.sdk.expansion.service.ExpansionService"

applyJavaNature(
enableChecker: true,
automaticModuleName: 'org.apache.beam.sdk.io.kinesis.expansion.service',
exportJavadoc: false,
validateShadowJar: false,
shadowClosure: {},
)

description = "Apache Beam :: SDKs :: Java :: IO :: Kinesis :: Expansion Service"
ext.summary = "Expansion service serving KinesisIO"

dependencies {
compile project(":sdks:java:expansion-service")
compile project(":sdks:java:io:kinesis")
runtime library.java.slf4j_jdk14
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,18 @@
import java.util.function.Supplier;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.Read.Unbounded;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
Expand Down Expand Up @@ -295,14 +300,16 @@ public final class KinesisIO {
private static final int DEFAULT_NUM_RETRIES = 6;

/** Returns a new {@link Read} transform for reading from Kinesis. */
public static Read read() {
return new AutoValue_KinesisIO_Read.Builder()
.setMaxNumRecords(Long.MAX_VALUE)
.setUpToDateThreshold(Duration.ZERO)
.setWatermarkPolicyFactory(WatermarkPolicyFactory.withArrivalTimePolicy())
.setRateLimitPolicyFactory(RateLimitPolicyFactory.withoutLimiter())
.setMaxCapacityPerShard(ShardReadersPool.DEFAULT_CAPACITY_PER_SHARD)
.build();
public static Read<KinesisRecord> read() {
return Read.newBuilder().setCoder(KinesisRecordCoder.of()).build();
}

/**
* A {@link PTransform} to read from Kinesis stream as bytes without metadata and returns a {@link
* PCollection} of {@link byte[]}.
*/
public static Read<byte[]> readData() {
return Read.newBuilder(KinesisRecord::getDataAsBytes).setCoder(ByteArrayCoder.of()).build();
}

/** A {@link PTransform} writing data to Kinesis. */
Expand All @@ -312,7 +319,7 @@ public static Write write() {

/** Implementation of {@link #read}. */
@AutoValue
public abstract static class Read extends PTransform<PBegin, PCollection<KinesisRecord>> {
public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> {

abstract @Nullable String getStreamName();

Expand All @@ -334,49 +341,71 @@ public abstract static class Read extends PTransform<PBegin, PCollection<Kinesis

abstract Integer getMaxCapacityPerShard();

abstract Builder toBuilder();
abstract Coder<T> getCoder();

abstract @Nullable SerializableFunction<KinesisRecord, T> getParseFn();

abstract Builder<T> toBuilder();

static <T> Builder<T> newBuilder(SerializableFunction<KinesisRecord, T> parseFn) {
return new AutoValue_KinesisIO_Read.Builder<T>()
.setParseFn(parseFn)
.setMaxNumRecords(Long.MAX_VALUE)
.setUpToDateThreshold(Duration.ZERO)
.setWatermarkPolicyFactory(WatermarkPolicyFactory.withArrivalTimePolicy())
.setRateLimitPolicyFactory(RateLimitPolicyFactory.withoutLimiter())
.setMaxCapacityPerShard(ShardReadersPool.DEFAULT_CAPACITY_PER_SHARD);
}

static Builder<KinesisRecord> newBuilder() {
return newBuilder(x -> x);
}

@AutoValue.Builder
abstract static class Builder {
abstract static class Builder<T> {

abstract Builder setStreamName(String streamName);
abstract Builder<T> setStreamName(String streamName);

abstract Builder setInitialPosition(StartingPoint startingPoint);
abstract Builder<T> setInitialPosition(StartingPoint startingPoint);

abstract Builder setAWSClientsProvider(AWSClientsProvider clientProvider);
abstract Builder<T> setAWSClientsProvider(AWSClientsProvider clientProvider);

abstract Builder<T> setMaxNumRecords(long maxNumRecords);

abstract Builder<T> setMaxReadTime(Duration maxReadTime);

abstract Builder setMaxNumRecords(long maxNumRecords);
abstract Builder<T> setUpToDateThreshold(Duration upToDateThreshold);

abstract Builder setMaxReadTime(Duration maxReadTime);
abstract Builder<T> setRequestRecordsLimit(Integer limit);

abstract Builder setUpToDateThreshold(Duration upToDateThreshold);
abstract Builder<T> setWatermarkPolicyFactory(WatermarkPolicyFactory watermarkPolicyFactory);

abstract Builder setRequestRecordsLimit(Integer limit);
abstract Builder<T> setRateLimitPolicyFactory(RateLimitPolicyFactory rateLimitPolicyFactory);

abstract Builder setWatermarkPolicyFactory(WatermarkPolicyFactory watermarkPolicyFactory);
abstract Builder<T> setMaxCapacityPerShard(Integer maxCapacity);

abstract Builder setRateLimitPolicyFactory(RateLimitPolicyFactory rateLimitPolicyFactory);
abstract Builder<T> setParseFn(SerializableFunction<KinesisRecord, T> parseFn);

abstract Builder setMaxCapacityPerShard(Integer maxCapacity);
abstract Builder<T> setCoder(Coder<T> coder);

abstract Read build();
abstract Read<T> build();
}

/** Specify reading from streamName. */
public Read withStreamName(String streamName) {
public Read<T> withStreamName(String streamName) {
return toBuilder().setStreamName(streamName).build();
}

/** Specify reading from some initial position in stream. */
public Read withInitialPositionInStream(InitialPositionInStream initialPosition) {
public Read<T> withInitialPositionInStream(InitialPositionInStream initialPosition) {
return toBuilder().setInitialPosition(new StartingPoint(initialPosition)).build();
}

/**
* Specify reading beginning at given {@link Instant}. This {@link Instant} must be in the past,
* i.e. before {@link Instant#now()}.
*/
public Read withInitialTimestampInStream(Instant initialTimestamp) {
public Read<T> withInitialTimestampInStream(Instant initialTimestamp) {
return toBuilder().setInitialPosition(new StartingPoint(initialTimestamp)).build();
}

Expand All @@ -386,7 +415,7 @@ public Read withInitialTimestampInStream(Instant initialTimestamp) {
* communication with Kinesis. You should use this method if {@link
* Read#withAWSClientsProvider(String, String, Regions)} does not suit your needs.
*/
public Read withAWSClientsProvider(AWSClientsProvider awsClientsProvider) {
public Read<T> withAWSClientsProvider(AWSClientsProvider awsClientsProvider) {
return toBuilder().setAWSClientsProvider(awsClientsProvider).build();
}

Expand All @@ -395,7 +424,8 @@ public Read withAWSClientsProvider(AWSClientsProvider awsClientsProvider) {
* sophisticated credential protocol, then you should look at {@link
* Read#withAWSClientsProvider(AWSClientsProvider)}.
*/
public Read withAWSClientsProvider(String awsAccessKey, String awsSecretKey, Regions region) {
public Read<T> withAWSClientsProvider(
String awsAccessKey, String awsSecretKey, Regions region) {
return withAWSClientsProvider(awsAccessKey, awsSecretKey, region, null);
}

Expand All @@ -407,7 +437,7 @@ public Read withAWSClientsProvider(String awsAccessKey, String awsSecretKey, Reg
* <p>The {@code serviceEndpoint} sets an alternative service host. This is useful to execute
* the tests with a kinesis service emulator.
*/
public Read withAWSClientsProvider(
public Read<T> withAWSClientsProvider(
String awsAccessKey, String awsSecretKey, Regions region, String serviceEndpoint) {
return withAWSClientsProvider(
new BasicKinesisProvider(awsAccessKey, awsSecretKey, region, serviceEndpoint));
Expand All @@ -424,7 +454,7 @@ public Read withAWSClientsProvider(
* <p>The {@code verifyCertificate} disables or enables certificate verification. Never set it
* to false in production.
*/
public Read withAWSClientsProvider(
public Read<T> withAWSClientsProvider(
String awsAccessKey,
String awsSecretKey,
Regions region,
Expand All @@ -436,14 +466,14 @@ public Read withAWSClientsProvider(
}

/** Specifies to read at most a given number of records. */
public Read withMaxNumRecords(long maxNumRecords) {
public Read<T> withMaxNumRecords(long maxNumRecords) {
checkArgument(
maxNumRecords > 0, "maxNumRecords must be positive, but was: %s", maxNumRecords);
return toBuilder().setMaxNumRecords(maxNumRecords).build();
}

/** Specifies to read records during {@code maxReadTime}. */
public Read withMaxReadTime(Duration maxReadTime) {
public Read<T> withMaxReadTime(Duration maxReadTime) {
checkArgument(maxReadTime != null, "maxReadTime can not be null");
return toBuilder().setMaxReadTime(maxReadTime).build();
}
Expand All @@ -454,7 +484,7 @@ public Read withMaxReadTime(Duration maxReadTime) {
* decide to scale the amount of resources allocated to the pipeline in order to speed up
* ingestion.
*/
public Read withUpToDateThreshold(Duration upToDateThreshold) {
public Read<T> withUpToDateThreshold(Duration upToDateThreshold) {
checkArgument(upToDateThreshold != null, "upToDateThreshold can not be null");
return toBuilder().setUpToDateThreshold(upToDateThreshold).build();
}
Expand All @@ -465,14 +495,14 @@ public Read withUpToDateThreshold(Duration upToDateThreshold) {
* prevent shard overloading. More details can be found here: <a
* href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html">API_GetRecords</a>
*/
public Read withRequestRecordsLimit(int limit) {
public Read<T> withRequestRecordsLimit(int limit) {
checkArgument(limit > 0, "limit must be positive, but was: %s", limit);
checkArgument(limit <= 10_000, "limit must be up to 10,000, but was: %s", limit);
return toBuilder().setRequestRecordsLimit(limit).build();
}

/** Specifies the {@code WatermarkPolicyFactory} as ArrivalTimeWatermarkPolicyFactory. */
public Read withArrivalTimeWatermarkPolicy() {
public Read<T> withArrivalTimeWatermarkPolicy() {
return toBuilder()
.setWatermarkPolicyFactory(WatermarkPolicyFactory.withArrivalTimePolicy())
.build();
Expand All @@ -484,15 +514,15 @@ public Read withArrivalTimeWatermarkPolicy() {
* <p>{@param watermarkIdleDurationThreshold} Denotes the duration for which the watermark can
* be idle.
*/
public Read withArrivalTimeWatermarkPolicy(Duration watermarkIdleDurationThreshold) {
public Read<T> withArrivalTimeWatermarkPolicy(Duration watermarkIdleDurationThreshold) {
return toBuilder()
.setWatermarkPolicyFactory(
WatermarkPolicyFactory.withArrivalTimePolicy(watermarkIdleDurationThreshold))
.build();
}

/** Specifies the {@code WatermarkPolicyFactory} as ProcessingTimeWatermarkPolicyFactory. */
public Read withProcessingTimeWatermarkPolicy() {
public Read<T> withProcessingTimeWatermarkPolicy() {
return toBuilder()
.setWatermarkPolicyFactory(WatermarkPolicyFactory.withProcessingTimePolicy())
.build();
Expand All @@ -503,13 +533,13 @@ public Read withProcessingTimeWatermarkPolicy() {
*
* @param watermarkPolicyFactory Custom Watermark policy factory.
*/
public Read withCustomWatermarkPolicy(WatermarkPolicyFactory watermarkPolicyFactory) {
public Read<T> withCustomWatermarkPolicy(WatermarkPolicyFactory watermarkPolicyFactory) {
checkArgument(watermarkPolicyFactory != null, "watermarkPolicyFactory cannot be null");
return toBuilder().setWatermarkPolicyFactory(watermarkPolicyFactory).build();
}

/** Specifies a fixed delay rate limit policy with the default delay of 1 second. */
public Read withFixedDelayRateLimitPolicy() {
public Read<T> withFixedDelayRateLimitPolicy() {
return toBuilder().setRateLimitPolicyFactory(RateLimitPolicyFactory.withFixedDelay()).build();
}

Expand All @@ -518,7 +548,7 @@ public Read withFixedDelayRateLimitPolicy() {
*
* @param delay Denotes the fixed delay duration.
*/
public Read withFixedDelayRateLimitPolicy(Duration delay) {
public Read<T> withFixedDelayRateLimitPolicy(Duration delay) {
checkArgument(delay != null, "delay cannot be null");
return toBuilder()
.setRateLimitPolicyFactory(RateLimitPolicyFactory.withFixedDelay(delay))
Expand All @@ -532,7 +562,7 @@ public Read withFixedDelayRateLimitPolicy(Duration delay) {
*
* @param delay The function to invoke to get the next delay duration.
*/
public Read withDynamicDelayRateLimitPolicy(Supplier<Duration> delay) {
public Read<T> withDynamicDelayRateLimitPolicy(Supplier<Duration> delay) {
checkArgument(delay != null, "delay cannot be null");
return toBuilder().setRateLimitPolicyFactory(RateLimitPolicyFactory.withDelay(delay)).build();
}
Expand All @@ -542,19 +572,19 @@ public Read withDynamicDelayRateLimitPolicy(Supplier<Duration> delay) {
*
* @param rateLimitPolicyFactory Custom rate limit policy factory.
*/
public Read withCustomRateLimitPolicy(RateLimitPolicyFactory rateLimitPolicyFactory) {
public Read<T> withCustomRateLimitPolicy(RateLimitPolicyFactory rateLimitPolicyFactory) {
checkArgument(rateLimitPolicyFactory != null, "rateLimitPolicyFactory cannot be null");
return toBuilder().setRateLimitPolicyFactory(rateLimitPolicyFactory).build();
}

/** Specifies the maximum number of messages per one shard. */
public Read withMaxCapacityPerShard(Integer maxCapacity) {
public Read<T> withMaxCapacityPerShard(Integer maxCapacity) {
checkArgument(maxCapacity > 0, "maxCapacity must be positive, but was: %s", maxCapacity);
return toBuilder().setMaxCapacityPerShard(maxCapacity).build();
}

@Override
public PCollection<KinesisRecord> expand(PBegin input) {
public PCollection<T> expand(PBegin input) {
Unbounded<KinesisRecord> unbounded =
org.apache.beam.sdk.io.Read.from(
new KinesisSource(
Expand All @@ -574,7 +604,10 @@ public PCollection<KinesisRecord> expand(PBegin input) {
unbounded.withMaxReadTime(getMaxReadTime()).withMaxNumRecords(getMaxNumRecords());
}

return input.apply(transform);
return input
.apply(transform)
.apply(MapElements.into(new TypeDescriptor<T>() {}).via(getParseFn()))
.setCoder(getCoder());
}
}

Expand Down
Loading

0 comments on commit 0335ba5

Please sign in to comment.