From ab9af72aeedea78dc57e172f4607a69c7d0c899e Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 30 Apr 2024 19:16:00 -0400 Subject: [PATCH] Revert "Add Redistribute translation to Samza runner" This reverts commit 8f1d3da4399cff02d082326a0dc91b8aebec2f1b. --- .../RedistributeByKeyTranslator.java | 64 ------------------- .../translation/ReshuffleTranslator.java | 14 +--- .../translation/SamzaPipelineTranslator.java | 1 - 3 files changed, 2 insertions(+), 77 deletions(-) delete mode 100644 runners/samza/src/main/java/org/apache/beam/runners/samza/translation/RedistributeByKeyTranslator.java diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/RedistributeByKeyTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/RedistributeByKeyTranslator.java deleted file mode 100644 index e59d74abe38f..000000000000 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/RedistributeByKeyTranslator.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.samza.translation; - -import com.google.auto.service.AutoService; -import org.apache.beam.model.pipeline.v1.RunnerApi; -import org.apache.beam.sdk.runners.TransformHierarchy; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.util.construction.NativeTransforms; -import org.apache.beam.sdk.util.construction.graph.PipelineNode; -import org.apache.beam.sdk.util.construction.graph.QueryablePipeline; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -/** - * Translates Reshuffle transform into Samza's native partitionBy operator, which will partition - * each incoming message by the key into a Task corresponding to that key. - */ -public class RedistributeByKeyTranslator - implements TransformTranslator>, PCollection>>> { - - private final ReshuffleTranslator reshuffleTranslator = - new ReshuffleTranslator<>("rdstr-"); - - @Override - public void translate( - PTransform>, PCollection>> transform, - TransformHierarchy.Node node, - TranslationContext ctx) { - reshuffleTranslator.translate(transform, node, ctx); - } - - @Override - public void translatePortable( - PipelineNode.PTransformNode transform, - QueryablePipeline pipeline, - PortableTranslationContext ctx) { - reshuffleTranslator.translatePortable(transform, pipeline, ctx); - } - - /** Predicate to determine whether a URN is a Samza native transform. */ - @AutoService(NativeTransforms.IsNativeTransform.class) - public static class IsSamzaNativeTransform implements NativeTransforms.IsNativeTransform { - @Override - public boolean test(RunnerApi.PTransform pTransform) { - return false; - } - } -} diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReshuffleTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReshuffleTranslator.java index bfcbd3edae40..0bfe935df6fc 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReshuffleTranslator.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/ReshuffleTranslator.java @@ -42,16 +42,6 @@ public class ReshuffleTranslator implements TransformTranslator>, PCollection>>> { - private final String prefix; - - ReshuffleTranslator(String prefix) { - this.prefix = prefix; - } - - ReshuffleTranslator() { - this("rshfl-"); - } - @Override public void translate( PTransform>, PCollection>> transform, @@ -70,7 +60,7 @@ public void translate( inputStream, inputCoder.getKeyCoder(), elementCoder, - prefix + ctx.getTransformId(), + "rshfl-" + ctx.getTransformId(), ctx.getPipelineOptions().getMaxSourceParallelism() > 1); ctx.registerMessageStream(output, outputStream); @@ -93,7 +83,7 @@ public void translatePortable( inputStream, ((KvCoder) windowedInputCoder.getValueCoder()).getKeyCoder(), windowedInputCoder, - prefix + ctx.getTransformId(), + "rshfl-" + ctx.getTransformId(), ctx.getPipelineOptions().getMaxSourceParallelism() > 1); ctx.registerMessageStream(outputId, outputStream); diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java index f29588d277ad..e90e63317708 100644 --- a/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java +++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/translation/SamzaPipelineTranslator.java @@ -179,7 +179,6 @@ public Map> getTransformTranslators() { return ImmutableMap.>builder() .put(PTransformTranslation.READ_TRANSFORM_URN, new ReadTranslator<>()) .put(PTransformTranslation.RESHUFFLE_URN, new ReshuffleTranslator<>()) - .put(PTransformTranslation.REDISTRIBUTE_BY_KEY_URN, new RedistributeByKeyTranslator<>()) .put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new ParDoBoundMultiTranslator<>()) .put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new GroupByKeyTranslator<>()) .put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, new GroupByKeyTranslator<>())