Skip to content

Commit

Permalink
Revert "Add Redistribute translation to Spark runner"
Browse files Browse the repository at this point in the history
This reverts commit 21e3fa1.
  • Loading branch information
kennknowles committed Apr 30, 2024
1 parent 3b272d2 commit ece456c
Show file tree
Hide file tree
Showing 2 changed files with 0 additions and 136 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Redistribute;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
Expand Down Expand Up @@ -767,69 +766,6 @@ public String toNativeString() {
};
}

private static <K, V, W extends BoundedWindow>
TransformEvaluator<Redistribute.RedistributeByKey<K, V>> redistributeByKey() {
return new TransformEvaluator<Redistribute.RedistributeByKey<K, V>>() {
@Override
public void evaluate(
Redistribute.RedistributeByKey<K, V> transform, EvaluationContext context) {
@SuppressWarnings("unchecked")
JavaRDD<WindowedValue<KV<K, V>>> inRDD =
((BoundedDataset<KV<K, V>>) context.borrowDataset(transform)).getRDD();
@SuppressWarnings("unchecked")
final WindowingStrategy<?, W> windowingStrategy =
(WindowingStrategy<?, W>) context.getInput(transform).getWindowingStrategy();
final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder();
@SuppressWarnings("unchecked")
final WindowFn<Object, W> windowFn = (WindowFn<Object, W>) windowingStrategy.getWindowFn();

final WindowedValue.WindowedValueCoder<KV<K, V>> wvCoder =
WindowedValue.FullWindowedValueCoder.of(coder, windowFn.windowCoder());

JavaRDD<WindowedValue<KV<K, V>>> reshuffled =
GroupCombineFunctions.reshuffle(inRDD, wvCoder);

context.putDataset(transform, new BoundedDataset<>(reshuffled));
}

@Override
public String toNativeString() {
return "repartition(...)";
}
};
}

private static <T, W extends BoundedWindow>
TransformEvaluator<Redistribute.RedistributeArbitrarily<T>> redistributeArbitrarily() {
return new TransformEvaluator<Redistribute.RedistributeArbitrarily<T>>() {
@Override
public void evaluate(
Redistribute.RedistributeArbitrarily<T> transform, EvaluationContext context) {
@SuppressWarnings("unchecked")
JavaRDD<WindowedValue<T>> inRDD =
((BoundedDataset<T>) context.borrowDataset(transform)).getRDD();
@SuppressWarnings("unchecked")
final WindowingStrategy<?, W> windowingStrategy =
(WindowingStrategy<?, W>) context.getInput(transform).getWindowingStrategy();
final Coder<T> coder = context.getInput(transform).getCoder();
@SuppressWarnings("unchecked")
final WindowFn<Object, W> windowFn = (WindowFn<Object, W>) windowingStrategy.getWindowFn();

final WindowedValue.WindowedValueCoder<T> wvCoder =
WindowedValue.FullWindowedValueCoder.of(coder, windowFn.windowCoder());

JavaRDD<WindowedValue<T>> reshuffled = GroupCombineFunctions.reshuffle(inRDD, wvCoder);

context.putDataset(transform, new BoundedDataset<>(reshuffled));
}

@Override
public String toNativeString() {
return "repartition(...)";
}
};
}

private static @Nullable Partitioner getPartitioner(EvaluationContext context) {
Long bundleSize =
context.getSerializableOptions().get().as(SparkPipelineOptions.class).getBundleSize();
Expand All @@ -852,8 +788,6 @@ public String toNativeString() {
EVALUATORS.put(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN, createPCollView());
EVALUATORS.put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, window());
EVALUATORS.put(PTransformTranslation.RESHUFFLE_URN, reshuffle());
EVALUATORS.put(PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN, redistributeArbitrarily());
EVALUATORS.put(PTransformTranslation.REDISTRIBUTE_BY_KEY_URN, redistributeByKey());
}

private static @Nullable TransformEvaluator<?> getTranslator(PTransform<?, ?> transform) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Redistribute;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
Expand Down Expand Up @@ -555,73 +554,6 @@ public String toNativeString() {
};
}

private static <K, V, W extends BoundedWindow>
TransformEvaluator<Redistribute.RedistributeByKey<K, V>> redistributeByKey() {
return new TransformEvaluator<Redistribute.RedistributeByKey<K, V>>() {
@Override
public void evaluate(
Redistribute.RedistributeByKey<K, V> transform, EvaluationContext context) {
@SuppressWarnings("unchecked")
UnboundedDataset<KV<K, V>> inputDataset =
(UnboundedDataset<KV<K, V>>) context.borrowDataset(transform);
List<Integer> streamSources = inputDataset.getStreamSources();
JavaDStream<WindowedValue<KV<K, V>>> dStream = inputDataset.getDStream();
final KvCoder<K, V> coder = (KvCoder<K, V>) context.getInput(transform).getCoder();
@SuppressWarnings("unchecked")
final WindowingStrategy<?, W> windowingStrategy =
(WindowingStrategy<?, W>) context.getInput(transform).getWindowingStrategy();
@SuppressWarnings("unchecked")
final WindowFn<Object, W> windowFn = (WindowFn<Object, W>) windowingStrategy.getWindowFn();

final WindowedValue.WindowedValueCoder<KV<K, V>> wvCoder =
WindowedValue.FullWindowedValueCoder.of(coder, windowFn.windowCoder());

JavaDStream<WindowedValue<KV<K, V>>> reshuffledStream =
dStream.transform(rdd -> GroupCombineFunctions.reshuffle(rdd, wvCoder));

context.putDataset(transform, new UnboundedDataset<>(reshuffledStream, streamSources));
}

@Override
public String toNativeString() {
return "repartition(...)";
}
};
}

private static <T, W extends BoundedWindow>
TransformEvaluator<Redistribute.RedistributeArbitrarily<T>> redistributeArbitrarily() {
return new TransformEvaluator<Redistribute.RedistributeArbitrarily<T>>() {
@Override
public void evaluate(
Redistribute.RedistributeArbitrarily<T> transform, EvaluationContext context) {
@SuppressWarnings("unchecked")
UnboundedDataset<T> inputDataset = (UnboundedDataset<T>) context.borrowDataset(transform);
List<Integer> streamSources = inputDataset.getStreamSources();
JavaDStream<WindowedValue<T>> dStream = inputDataset.getDStream();
final Coder<T> coder = context.getInput(transform).getCoder();
@SuppressWarnings("unchecked")
final WindowingStrategy<?, W> windowingStrategy =
(WindowingStrategy<?, W>) context.getInput(transform).getWindowingStrategy();
@SuppressWarnings("unchecked")
final WindowFn<Object, W> windowFn = (WindowFn<Object, W>) windowingStrategy.getWindowFn();

final WindowedValue.WindowedValueCoder<T> wvCoder =
WindowedValue.FullWindowedValueCoder.of(coder, windowFn.windowCoder());

JavaDStream<WindowedValue<T>> reshuffledStream =
dStream.transform(rdd -> GroupCombineFunctions.reshuffle(rdd, wvCoder));

context.putDataset(transform, new UnboundedDataset<>(reshuffledStream, streamSources));
}

@Override
public String toNativeString() {
return "repartition(...)";
}
};
}

private static final Map<String, TransformEvaluator<?>> EVALUATORS = new HashMap<>();

static {
Expand All @@ -634,8 +566,6 @@ public String toNativeString() {
EVALUATORS.put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, window());
EVALUATORS.put(PTransformTranslation.FLATTEN_TRANSFORM_URN, flattenPColl());
EVALUATORS.put(PTransformTranslation.RESHUFFLE_URN, reshuffle());
EVALUATORS.put(PTransformTranslation.REDISTRIBUTE_BY_KEY_URN, redistributeByKey());
EVALUATORS.put(PTransformTranslation.REDISTRIBUTE_ARBITRARILY_URN, redistributeArbitrarily());
// For testing only
EVALUATORS.put(CreateStream.TRANSFORM_URN, createFromQueue());
EVALUATORS.put(PTransformTranslation.TEST_STREAM_TRANSFORM_URN, createFromTestStream());
Expand Down

0 comments on commit ece456c

Please sign in to comment.