Skip to content

Commit

Permalink
[BEAM-12976] Test a whole pipeline using projection pushdown in BQ IO.
Browse files Browse the repository at this point in the history
  • Loading branch information
ibzib committed Mar 17, 2022
1 parent f7d9e6a commit b88bed5
Showing 1 changed file with 58 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,24 @@
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TableRowParser;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.ConversionOptions;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -51,7 +59,8 @@ public class BigQueryIOStorageReadIT {
"empty", 0L,
"1M", 10592L,
"1G", 11110839L,
"1T", 11110839000L);
"1T", 11110839000L,
"multi_field", 11110839L);

private static final String DATASET_ID = "big_query_storage";
private static final String TABLE_PREFIX = "storage_read_";
Expand Down Expand Up @@ -113,4 +122,52 @@ public void testBigQueryStorageRead1GArrow() throws Exception {
setUpTestEnvironment("1G", DataFormat.ARROW);
runBigQueryIOStorageReadPipeline();
}

/**
* Tests a pipeline where {@link
* org.apache.beam.runners.core.construction.graph.ProjectionPushdownOptimizer} may do
* optimizations, depending on the runner. The pipeline should run successfully either way.
*/
@Test
public void testBigQueryStorageReadProjectionPushdown() throws Exception {
setUpTestEnvironment("multi_field", DataFormat.AVRO);

Schema multiFieldSchema =
Schema.builder()
.addNullableField("string_field", FieldType.STRING)
.addNullableField("int_field", FieldType.INT64)
.build();

Pipeline p = Pipeline.create(options);
PCollection<Long> count =
p.apply(
"Read",
BigQueryIO.read(
record ->
BigQueryUtils.toBeamRow(
record.getRecord(),
multiFieldSchema,
ConversionOptions.builder().build()))
.from(options.getInputTable())
.withMethod(Method.DIRECT_READ)
.withFormat(options.getDataFormat())
.withCoder(SchemaCoder.of(multiFieldSchema)))
.apply(ParDo.of(new GetIntField()))
.apply("Count", Count.globally());
PAssert.thatSingleton(count).isEqualTo(options.getNumRecords());
p.run().waitUntilFinish();
}

private static class GetIntField extends DoFn<Row, Long> {
@SuppressWarnings("unused") // used by reflection
@FieldAccess("row")
private final FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withFieldNames("int_field");

@ProcessElement
public void processElement(@FieldAccess("row") Row row, OutputReceiver<Long> outputReceiver)
throws Exception {
outputReceiver.output(row.getValue("int_field"));
}
}
}

0 comments on commit b88bed5

Please sign in to comment.