From b88bed52e687e61d03ce0341ad71ced8e49278d8 Mon Sep 17 00:00:00 2001 From: Kyle Weaver Date: Thu, 17 Mar 2022 13:14:10 -0700 Subject: [PATCH 1/2] [BEAM-12976] Test a whole pipeline using projection pushdown in BQ IO. --- .../gcp/bigquery/BigQueryIOStorageReadIT.java | 59 ++++++++++++++++++- 1 file changed, 58 insertions(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java index d2794a656f13..b7c50c8054af 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadIT.java @@ -23,16 +23,24 @@ import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TableRowParser; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.ConversionOptions; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.Validation; +import org.apache.beam.sdk.schemas.FieldAccessDescriptor; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestPipelineOptions; import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; import org.junit.Test; import org.junit.runner.RunWith; @@ -51,7 +59,8 @@ public class BigQueryIOStorageReadIT { "empty", 0L, "1M", 10592L, "1G", 11110839L, - "1T", 11110839000L); + "1T", 11110839000L, + "multi_field", 11110839L); private static final String DATASET_ID = "big_query_storage"; private static final String TABLE_PREFIX = "storage_read_"; @@ -113,4 +122,52 @@ public void testBigQueryStorageRead1GArrow() throws Exception { setUpTestEnvironment("1G", DataFormat.ARROW); runBigQueryIOStorageReadPipeline(); } + + /** + * Tests a pipeline where {@link + * org.apache.beam.runners.core.construction.graph.ProjectionPushdownOptimizer} may do + * optimizations, depending on the runner. The pipeline should run successfully either way. + */ + @Test + public void testBigQueryStorageReadProjectionPushdown() throws Exception { + setUpTestEnvironment("multi_field", DataFormat.AVRO); + + Schema multiFieldSchema = + Schema.builder() + .addNullableField("string_field", FieldType.STRING) + .addNullableField("int_field", FieldType.INT64) + .build(); + + Pipeline p = Pipeline.create(options); + PCollection count = + p.apply( + "Read", + BigQueryIO.read( + record -> + BigQueryUtils.toBeamRow( + record.getRecord(), + multiFieldSchema, + ConversionOptions.builder().build())) + .from(options.getInputTable()) + .withMethod(Method.DIRECT_READ) + .withFormat(options.getDataFormat()) + .withCoder(SchemaCoder.of(multiFieldSchema))) + .apply(ParDo.of(new GetIntField())) + .apply("Count", Count.globally()); + PAssert.thatSingleton(count).isEqualTo(options.getNumRecords()); + p.run().waitUntilFinish(); + } + + private static class GetIntField extends DoFn { + @SuppressWarnings("unused") // used by reflection + @FieldAccess("row") + private final FieldAccessDescriptor fieldAccessDescriptor = + FieldAccessDescriptor.withFieldNames("int_field"); + + @ProcessElement + public void processElement(@FieldAccess("row") Row row, OutputReceiver outputReceiver) + throws Exception { + outputReceiver.output(row.getValue("int_field")); + } + } } From eb5fbd33547db7edd0aa366621457872843795c8 Mon Sep 17 00:00:00 2001 From: Kyle Weaver Date: Thu, 17 Mar 2022 13:14:48 -0700 Subject: [PATCH 2/2] [BEAM-12976] Enable projection pushdown for Java pipelines on Dataflow, Flink, and Spark. --- CHANGES.md | 1 + .../java/org/apache/beam/runners/flink/FlinkRunner.java | 6 ++++++ .../org/apache/beam/runners/dataflow/DataflowRunner.java | 8 +++++++- .../java/org/apache/beam/runners/spark/SparkRunner.java | 6 ++++++ 4 files changed, 20 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index de9a0b995920..d6698c9e83a0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -57,6 +57,7 @@ * New highly anticipated feature Y added to Java SDK ([BEAM-Y](https://issues.apache.org/jira/browse/BEAM-Y)). ## I/Os +* Introduce projection pushdown optimizer to the Java SDK ([BEAM-12976](https://issues.apache.org/jira/browse/BEAM-12976)). The optimizer currently only works on the [BigQuery Storage API](https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-api), but more I/Os will be added in future releases. If you encounter a bug with the optimizer, please file a JIRA and disable the optimizer using pipeline option `--experiments=disable_projection_pushdown`. * Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)). * `amazon-web-services2` has reached feature parity and is finally recommended over the earlier `amazon-web-services` and `kinesis` modules (Java). These will be deprecated in one of the next releases ([BEAM-13174](https://issues.apache.org/jira/browse/BEAM-13174)). diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java index c4735b8f38d4..ee5e5a25f027 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunner.java @@ -23,6 +23,7 @@ import java.util.SortedSet; import java.util.TreeSet; import org.apache.beam.runners.core.construction.SplittableParDo; +import org.apache.beam.runners.core.construction.graph.ProjectionPushdownOptimizer; import org.apache.beam.runners.core.metrics.MetricsPusher; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; @@ -81,6 +82,11 @@ public PipelineResult run(Pipeline pipeline) { SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline); } + if (!options.isStreaming() + && !ExperimentalOptions.hasExperiment(options, "disable_projection_pushdown")) { + ProjectionPushdownOptimizer.optimize(pipeline); + } + logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline); MetricsEnvironment.setMetricsSupported(true); diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index 7d869103a934..8aa6ebf2af47 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -76,6 +76,7 @@ import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource; import org.apache.beam.runners.core.construction.UnconsumedReads; import org.apache.beam.runners.core.construction.WriteFilesTranslation; +import org.apache.beam.runners.core.construction.graph.ProjectionPushdownOptimizer; import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification; import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory; import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext; @@ -113,6 +114,7 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink; import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource; import org.apache.beam.sdk.io.kafka.KafkaIO; +import org.apache.beam.sdk.options.ExperimentalOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsValidator; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; @@ -1025,7 +1027,6 @@ private List getDefaultArtifacts() { @Override public DataflowPipelineJob run(Pipeline pipeline) { - if (useUnifiedWorker(options)) { List experiments = options.getExperiments(); // non-null if useUnifiedWorker is true if (!experiments.contains("use_runner_v2")) { @@ -1048,6 +1049,11 @@ public DataflowPipelineJob run(Pipeline pipeline) { options.setStreaming(true); } + if (!options.isStreaming() + && !ExperimentalOptions.hasExperiment(options, "disable_projection_pushdown")) { + ProjectionPushdownOptimizer.optimize(pipeline); + } + LOG.info( "Executing pipeline on the Dataflow Service, which will have billing implications " + "related to Google Compute Engine usage and other Google Cloud Services."); diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java index ed434a2e348b..c15cf15df057 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/SparkRunner.java @@ -28,6 +28,7 @@ import java.util.concurrent.Future; import org.apache.beam.runners.core.construction.SplittableParDo; import org.apache.beam.runners.core.construction.TransformInputs; +import org.apache.beam.runners.core.construction.graph.ProjectionPushdownOptimizer; import org.apache.beam.runners.core.metrics.MetricsPusher; import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator; import org.apache.beam.runners.spark.metrics.AggregatorMetricSource; @@ -160,6 +161,11 @@ public SparkPipelineResult run(final Pipeline pipeline) { SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline); } + if (!pipelineOptions.isStreaming() + && !ExperimentalOptions.hasExperiment(pipelineOptions, "disable_projection_pushdown")) { + ProjectionPushdownOptimizer.optimize(pipeline); + } + pipeline.replaceAll(SparkTransformOverrides.getDefaultOverrides(pipelineOptions.isStreaming())); prepareFilesToStage(pipelineOptions);