diff --git a/.github/trigger_files/beam_PostCommit_Java_PVR_Samza.json b/.github/trigger_files/beam_PostCommit_Java_PVR_Samza.json index b970762c8397..a937ef2fc07d 100644 --- a/.github/trigger_files/beam_PostCommit_Java_PVR_Samza.json +++ b/.github/trigger_files/beam_PostCommit_Java_PVR_Samza.json @@ -1,4 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test" + "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test", + "https://github.com/apache/beam/pull/31270": "re-adds specialized Samza translation of Redistribute" } diff --git a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Samza.json b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Samza.json index b970762c8397..1d4a0589a276 100644 --- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Samza.json +++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Samza.json @@ -1,4 +1,5 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run", - "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test" + "https://github.com/apache/beam/pull/31156": "noting that PR #31156 should run this test", + "https://github.com/apache/beam/pull/31270": "re-add specialized Samza translation of Redistribute" } diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/RedistributeByKeyTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/RedistributeByKeyTranslator.java new file mode 100644 index 000000000000..e59d74abe38f --- /dev/null +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/RedistributeByKeyTranslator.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.samza.translation; + +import com.google.auto.service.AutoService; +import org.apache.beam.model.pipeline.v1.RunnerApi; +import org.apache.beam.sdk.runners.TransformHierarchy; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.util.construction.NativeTransforms; +import org.apache.beam.sdk.util.construction.graph.PipelineNode; +import org.apache.beam.sdk.util.construction.graph.QueryablePipeline; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; + +/** + * Translates Reshuffle transform into Samza's native partitionBy operator, which will partition + * each incoming message by the key into a Task corresponding to that key. + */ +public class RedistributeByKeyTranslator + implements TransformTranslator>, PCollection>>> { + + private final ReshuffleTranslator reshuffleTranslator = + new ReshuffleTranslator<>("rdstr-"); + + @Override + public void translate( + PTransform>, PCollection>> transform, + TransformHierarchy.Node node, + TranslationContext ctx) { + reshuffleTranslator.translate(transform, node, ctx); + } + + @Override + public void translatePortable( + PipelineNode.PTransformNode transform, + QueryablePipeline pipeline, + PortableTranslationContext ctx) { + reshuffleTranslator.translatePortable(transform, pipeline, ctx); + } + + /** Predicate to determine whether a URN is a Samza native transform. */ + @AutoService(NativeTransforms.IsNativeTransform.class) + public static class IsSamzaNativeTransform implements NativeTransforms.IsNativeTransform { + @Override + public boolean test(RunnerApi.PTransform pTransform) { + return false; + } + } +} diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReshuffleTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReshuffleTranslator.java index 0bfe935df6fc..bfcbd3edae40 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReshuffleTranslator.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReshuffleTranslator.java @@ -42,6 +42,16 @@ public class ReshuffleTranslator implements TransformTranslator>, PCollection>>> { + private final String prefix; + + ReshuffleTranslator(String prefix) { + this.prefix = prefix; + } + + ReshuffleTranslator() { + this("rshfl-"); + } + @Override public void translate( PTransform>, PCollection>> transform, @@ -60,7 +70,7 @@ public void translate( inputStream, inputCoder.getKeyCoder(), elementCoder, - "rshfl-" + ctx.getTransformId(), + prefix + ctx.getTransformId(), ctx.getPipelineOptions().getMaxSourceParallelism() > 1); ctx.registerMessageStream(output, outputStream); @@ -83,7 +93,7 @@ public void translatePortable( inputStream, ((KvCoder) windowedInputCoder.getValueCoder()).getKeyCoder(), windowedInputCoder, - "rshfl-" + ctx.getTransformId(), + prefix + ctx.getTransformId(), ctx.getPipelineOptions().getMaxSourceParallelism() > 1); ctx.registerMessageStream(outputId, outputStream); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java index e90e63317708..f29588d277ad 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java @@ -179,6 +179,7 @@ public Map> getTransformTranslators() { return ImmutableMap.>builder() .put(PTransformTranslation.READ_TRANSFORM_URN, new ReadTranslator<>()) .put(PTransformTranslation.RESHUFFLE_URN, new ReshuffleTranslator<>()) + .put(PTransformTranslation.REDISTRIBUTE_BY_KEY_URN, new RedistributeByKeyTranslator<>()) .put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new ParDoBoundMultiTranslator<>()) .put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new GroupByKeyTranslator<>()) .put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, new GroupByKeyTranslator<>())