Skip to content

Commit

Permalink
Add a workaround for BEAM-20873 for optimized list side inputs. (#31163)
Browse files Browse the repository at this point in the history
This triggered a case with Dataflow Runner v1 and PTransformOverrides
that exposed #20873 .
  • Loading branch information
robertwb authored May 3, 2024
1 parent 7090260 commit afe0793
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews.HasDefaultValue;
import org.apache.beam.sdk.values.PCollectionViews.IterableBackedListViewFn;
import org.apache.beam.sdk.values.PCollectionViews.IterableViewFn;
import org.apache.beam.sdk.values.PCollectionViews.IterableViewFn2;
import org.apache.beam.sdk.values.PCollectionViews.ListViewFn;
Expand Down Expand Up @@ -354,7 +355,8 @@ public <ViewT> ViewT get(final PCollectionView<ViewT> view, final BoundedWindow
if (viewFn instanceof IterableViewFn
|| viewFn instanceof IterableViewFn2
|| viewFn instanceof ListViewFn
|| viewFn instanceof ListViewFn2) {
|| viewFn instanceof ListViewFn2
|| viewFn instanceof IterableBackedListViewFn) {
@SuppressWarnings("unchecked")
ViewT viewT = (ViewT) getListForWindow(tag, window);
return viewT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,11 +286,18 @@ public PCollectionView<List<T>> expand(PCollection<T> input) {
}

private PCollectionView<List<T>> expandWithoutRandomAccess(PCollection<T> input) {
Coder<T> inputCoder = input.getCoder();
// HACK to work around https://github.com/apache/beam/issues/20873:
// There are bugs in "composite" vs "primitive" transform distinction
// in TransformHierachy. This noop transform works around them and should be zero
// cost.
PCollection<T> materializationInput =
input.apply(MapElements.via(new SimpleFunction<T, T>(x -> x) {}));
PCollectionView<List<T>> view =
PCollectionViews.listView(
input,
(TypeDescriptorSupplier<T>) input.getCoder()::getEncodedTypeDescriptor,
input.getWindowingStrategy());
materializationInput,
(TypeDescriptorSupplier<T>) inputCoder::getEncodedTypeDescriptor,
materializationInput.getWindowingStrategy());
input.apply(CreatePCollectionView.of(view));
return view;
}
Expand Down

0 comments on commit afe0793

Please sign in to comment.