Skip to content

Commit

Permalink
[BEAM-12384] Set output typeDescriptor explictly in Read.Bounded tran…
Browse files Browse the repository at this point in the history
…sform
  • Loading branch information
iemejia committed May 26, 2021
1 parent 331c67c commit 3b705c1
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ public final PCollection<T> expand(PBegin input) {
.apply(ParDo.of(new OutputSingleSource<>(source)))
.setCoder(SerializableCoder.of(new TypeDescriptor<BoundedSource<T>>() {}))
.apply(ParDo.of(new BoundedSourceAsSDFWrapperFn<>()))
.setCoder(source.getOutputCoder());
.setCoder(source.getOutputCoder())
.setTypeDescriptor(source.getOutputCoder().getEncodedTypeDescriptor());
}

/** Returns the {@code BoundedSource} used to create this {@code Read} {@code PTransform}. */
Expand Down
43 changes: 43 additions & 0 deletions sdks/java/core/src/test/java/org/apache/beam/sdk/io/ReadTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -41,6 +42,7 @@
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.io.CountingSource.CounterMark;
Expand All @@ -63,6 +65,7 @@
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -149,6 +152,17 @@ public void populateDisplayData(DisplayData.Builder builder) {
assertThat(unboundedDisplayData, hasDisplayItem("maxReadTime", maxReadTime));
}

@Test
public void testReadBoundedPreservesTypeDescriptor() {
PCollection<String> input = pipeline.apply(Read.from(new SerializableBoundedSource()));
TypeDescriptor<String> typeDescriptor = input.getTypeDescriptor();
assertEquals(String.class, typeDescriptor.getType());

ListBoundedSource<Long> longs = new ListBoundedSource<>(VarLongCoder.of());
PCollection<List<Long>> numbers = pipeline.apply(Read.from(longs));
assertEquals(new TypeDescriptor<List<Long>>() {}, numbers.getTypeDescriptor());
}

@Test
@Category({
NeedsRunner.class,
Expand Down Expand Up @@ -261,6 +275,35 @@ public Coder<String> getOutputCoder() {
}
}

private static class ListBoundedSource<T> extends BoundedSource<List<T>> {
private Coder<T> coder;

ListBoundedSource(Coder<T> coder) {
this.coder = coder;
}

@Override
public List<? extends BoundedSource<List<T>>> split(
long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
return null;
}

@Override
public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
return 0;
}

@Override
public BoundedReader<List<T>> createReader(PipelineOptions options) throws IOException {
return null;
}

@Override
public Coder<List<T>> getOutputCoder() {
return ListCoder.of(coder);
}
}

private static class NotSerializableBoundedSource extends CustomBoundedSource {
@SuppressWarnings("unused")
private final NotSerializableClass notSerializableClass = new NotSerializableClass();
Expand Down

0 comments on commit 3b705c1

Please sign in to comment.