From ece456c1a91d1b2af50b5a9e9f88eb2a701ccb78 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 30 Apr 2024 19:15:57 -0400 Subject: [PATCH] Revert "Add Redistribute translation to Spark runner" This reverts commit 21e3fa1eedff5c7a79a8464759c4a47fe1e3c0e4. --- .../translation/TransformTranslator.java | 66 ----------------- .../StreamingTransformTranslator.java | 70 ------------------- 2 files changed, 136 deletions(-) diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index bf8ce639f88a..5dc553faab5b 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -50,7 +50,6 @@ import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Redistribute; import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; @@ -767,69 +766,6 @@ public String toNativeString() { }; } - private static - TransformEvaluator> redistributeByKey() { - return new TransformEvaluator>() { - @Override - public void evaluate( - Redistribute.RedistributeByKey transform, EvaluationContext context) { - @SuppressWarnings("unchecked") - JavaRDD>> inRDD = - ((BoundedDataset>) context.borrowDataset(transform)).getRDD(); - @SuppressWarnings("unchecked") - final WindowingStrategy windowingStrategy = - (WindowingStrategy) context.getInput(transform).getWindowingStrategy(); - final KvCoder coder = (KvCoder) context.getInput(transform).getCoder(); - @SuppressWarnings("unchecked") - final WindowFn windowFn = (WindowFn) windowingStrategy.getWindowFn(); - - final WindowedValue.WindowedValueCoder> wvCoder = - WindowedValue.FullWindowedValueCoder.of(coder, windowFn.windowCoder()); - - JavaRDD>> reshuffled = - GroupCombineFunctions.reshuffle(inRDD, wvCoder); - - context.putDataset(transform, new BoundedDataset<>(reshuffled)); - } - - @Override - public String toNativeString() { - return "repartition(...)"; - } - }; - } - - private static - TransformEvaluator> redistributeArbitrarily() { - return new TransformEvaluator>() { - @Override - public void evaluate( - Redistribute.RedistributeArbitrarily transform, EvaluationContext context) { - @SuppressWarnings("unchecked") - JavaRDD> inRDD = - ((BoundedDataset) context.borrowDataset(transform)).getRDD(); - @SuppressWarnings("unchecked") - final WindowingStrategy windowingStrategy = - (WindowingStrategy) context.getInput(transform).getWindowingStrategy(); - final Coder coder = context.getInput(transform).getCoder(); - @SuppressWarnings("unchecked") - final WindowFn windowFn = (WindowFn) windowingStrategy.getWindowFn(); - - final WindowedValue.WindowedValueCoder wvCoder = - WindowedValue.FullWindowedValueCoder.of(coder, windowFn.windowCoder()); - - JavaRDD> reshuffled = GroupCombineFunctions.reshuffle(inRDD, wvCoder); - - context.putDataset(transform, new BoundedDataset<>(reshuffled)); - } - - @Override - public String toNativeString() { - return "repartition(...)"; - } - }; - } - private static @Nullable Partitioner getPartitioner(EvaluationContext context) { Long bundleSize = context.getSerializableOptions().get().as(SparkPipelineOptions.class).getBundleSize(); @@ -852,8 +788,6 @@ public String toNativeString() { EVALUATORS.put(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN, createPCollView()); EVALUATORS.put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, window()); EVALUATORS.put(PTransformTranslation.RESHUFFLE_URN, reshuffle()); - EVALUATORS.put(PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN, redistributeArbitrarily()); - EVALUATORS.put(PTransformTranslation.REDISTRIBUTE_BY_KEY_URN, redistributeByKey()); } private static @Nullable TransformEvaluator getTranslator(PTransform transform) { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index 50f444447fbb..5be8e718dec6 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -64,7 +64,6 @@ import org.apache.beam.sdk.transforms.Impulse; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Redistribute; import org.apache.beam.sdk.transforms.Reshuffle; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -555,73 +554,6 @@ public String toNativeString() { }; } - private static - TransformEvaluator> redistributeByKey() { - return new TransformEvaluator>() { - @Override - public void evaluate( - Redistribute.RedistributeByKey transform, EvaluationContext context) { - @SuppressWarnings("unchecked") - UnboundedDataset> inputDataset = - (UnboundedDataset>) context.borrowDataset(transform); - List streamSources = inputDataset.getStreamSources(); - JavaDStream>> dStream = inputDataset.getDStream(); - final KvCoder coder = (KvCoder) context.getInput(transform).getCoder(); - @SuppressWarnings("unchecked") - final WindowingStrategy windowingStrategy = - (WindowingStrategy) context.getInput(transform).getWindowingStrategy(); - @SuppressWarnings("unchecked") - final WindowFn windowFn = (WindowFn) windowingStrategy.getWindowFn(); - - final WindowedValue.WindowedValueCoder> wvCoder = - WindowedValue.FullWindowedValueCoder.of(coder, windowFn.windowCoder()); - - JavaDStream>> reshuffledStream = - dStream.transform(rdd -> GroupCombineFunctions.reshuffle(rdd, wvCoder)); - - context.putDataset(transform, new UnboundedDataset<>(reshuffledStream, streamSources)); - } - - @Override - public String toNativeString() { - return "repartition(...)"; - } - }; - } - - private static - TransformEvaluator> redistributeArbitrarily() { - return new TransformEvaluator>() { - @Override - public void evaluate( - Redistribute.RedistributeArbitrarily transform, EvaluationContext context) { - @SuppressWarnings("unchecked") - UnboundedDataset inputDataset = (UnboundedDataset) context.borrowDataset(transform); - List streamSources = inputDataset.getStreamSources(); - JavaDStream> dStream = inputDataset.getDStream(); - final Coder coder = context.getInput(transform).getCoder(); - @SuppressWarnings("unchecked") - final WindowingStrategy windowingStrategy = - (WindowingStrategy) context.getInput(transform).getWindowingStrategy(); - @SuppressWarnings("unchecked") - final WindowFn windowFn = (WindowFn) windowingStrategy.getWindowFn(); - - final WindowedValue.WindowedValueCoder wvCoder = - WindowedValue.FullWindowedValueCoder.of(coder, windowFn.windowCoder()); - - JavaDStream> reshuffledStream = - dStream.transform(rdd -> GroupCombineFunctions.reshuffle(rdd, wvCoder)); - - context.putDataset(transform, new UnboundedDataset<>(reshuffledStream, streamSources)); - } - - @Override - public String toNativeString() { - return "repartition(...)"; - } - }; - } - private static final Map> EVALUATORS = new HashMap<>(); static { @@ -634,8 +566,6 @@ public String toNativeString() { EVALUATORS.put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, window()); EVALUATORS.put(PTransformTranslation.FLATTEN_TRANSFORM_URN, flattenPColl()); EVALUATORS.put(PTransformTranslation.RESHUFFLE_URN, reshuffle()); - EVALUATORS.put(PTransformTranslation.REDISTRIBUTE_BY_KEY_URN, redistributeByKey()); - EVALUATORS.put(PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN, redistributeArbitrarily()); // For testing only EVALUATORS.put(CreateStream.TRANSFORM_URN, createFromQueue()); EVALUATORS.put(PTransformTranslation.TEST_STREAM_TRANSFORM_URN, createFromTestStream());