Skip to content

Commit

Permalink
Merge pull request apache#17107 from ibzib/enable-optimizer
Browse files Browse the repository at this point in the history
[BEAM-12976] Enable projection pushdown in runners
  • Loading branch information
ibzib authored Mar 18, 2022
2 parents 0510cec + eb5fbd3 commit 3e472e2
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 2 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
* New highly anticipated feature Y added to Java SDK ([BEAM-Y](https://issues.apache.org/jira/browse/BEAM-Y)).

## I/Os
* Introduce projection pushdown optimizer to the Java SDK ([BEAM-12976](https://issues.apache.org/jira/browse/BEAM-12976)). The optimizer currently only works on the [BigQuery Storage API](https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-api), but more I/Os will be added in future releases. If you encounter a bug with the optimizer, please file a JIRA and disable the optimizer using pipeline option `--experiments=disable_projection_pushdown`.

* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* `amazon-web-services2` has reached feature parity and is finally recommended over the earlier `amazon-web-services` and `kinesis` modules (Java). These will be deprecated in one of the next releases ([BEAM-13174](https://issues.apache.org/jira/browse/BEAM-13174)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.core.construction.graph.ProjectionPushdownOptimizer;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
Expand Down Expand Up @@ -81,6 +82,11 @@ public PipelineResult run(Pipeline pipeline) {
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
}

if (!options.isStreaming()
&& !ExperimentalOptions.hasExperiment(options, "disable_projection_pushdown")) {
ProjectionPushdownOptimizer.optimize(pipeline);
}

logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);

MetricsEnvironment.setMetricsSupported(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
import org.apache.beam.runners.core.construction.UnconsumedReads;
import org.apache.beam.runners.core.construction.WriteFilesTranslation;
import org.apache.beam.runners.core.construction.graph.ProjectionPushdownOptimizer;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory;
import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext;
Expand Down Expand Up @@ -113,6 +114,7 @@
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
Expand Down Expand Up @@ -1027,7 +1029,6 @@ private List<RunnerApi.ArtifactInformation> getDefaultArtifacts() {

@Override
public DataflowPipelineJob run(Pipeline pipeline) {

if (useUnifiedWorker(options)) {
List<String> experiments = options.getExperiments(); // non-null if useUnifiedWorker is true
if (!experiments.contains("use_runner_v2")) {
Expand All @@ -1050,6 +1051,11 @@ public DataflowPipelineJob run(Pipeline pipeline) {
options.setStreaming(true);
}

if (!options.isStreaming()
&& !ExperimentalOptions.hasExperiment(options, "disable_projection_pushdown")) {
ProjectionPushdownOptimizer.optimize(pipeline);
}

LOG.info(
"Executing pipeline on the Dataflow Service, which will have billing implications "
+ "related to Google Compute Engine usage and other Google Cloud Services.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.Future;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.runners.core.construction.graph.ProjectionPushdownOptimizer;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
import org.apache.beam.runners.spark.metrics.AggregatorMetricSource;
Expand Down Expand Up @@ -160,6 +161,11 @@ public SparkPipelineResult run(final Pipeline pipeline) {
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
}

if (!pipelineOptions.isStreaming()
&& !ExperimentalOptions.hasExperiment(pipelineOptions, "disable_projection_pushdown")) {
ProjectionPushdownOptimizer.optimize(pipeline);
}

pipeline.replaceAll(SparkTransformOverrides.getDefaultOverrides(pipelineOptions.isStreaming()));

prepareFilesToStage(pipelineOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,24 @@
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TableRowParser;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.ConversionOptions;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -51,7 +59,8 @@ public class BigQueryIOStorageReadIT {
"empty", 0L,
"1M", 10592L,
"1G", 11110839L,
"1T", 11110839000L);
"1T", 11110839000L,
"multi_field", 11110839L);

private static final String DATASET_ID = "big_query_storage";
private static final String TABLE_PREFIX = "storage_read_";
Expand Down Expand Up @@ -113,4 +122,52 @@ public void testBigQueryStorageRead1GArrow() throws Exception {
setUpTestEnvironment("1G", DataFormat.ARROW);
runBigQueryIOStorageReadPipeline();
}

/**
* Tests a pipeline where {@link
* org.apache.beam.runners.core.construction.graph.ProjectionPushdownOptimizer} may do
* optimizations, depending on the runner. The pipeline should run successfully either way.
*/
@Test
public void testBigQueryStorageReadProjectionPushdown() throws Exception {
setUpTestEnvironment("multi_field", DataFormat.AVRO);

Schema multiFieldSchema =
Schema.builder()
.addNullableField("string_field", FieldType.STRING)
.addNullableField("int_field", FieldType.INT64)
.build();

Pipeline p = Pipeline.create(options);
PCollection<Long> count =
p.apply(
"Read",
BigQueryIO.read(
record ->
BigQueryUtils.toBeamRow(
record.getRecord(),
multiFieldSchema,
ConversionOptions.builder().build()))
.from(options.getInputTable())
.withMethod(Method.DIRECT_READ)
.withFormat(options.getDataFormat())
.withCoder(SchemaCoder.of(multiFieldSchema)))
.apply(ParDo.of(new GetIntField()))
.apply("Count", Count.globally());
PAssert.thatSingleton(count).isEqualTo(options.getNumRecords());
p.run().waitUntilFinish();
}

private static class GetIntField extends DoFn<Row, Long> {
@SuppressWarnings("unused") // used by reflection
@FieldAccess("row")
private final FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withFieldNames("int_field");

@ProcessElement
public void processElement(@FieldAccess("row") Row row, OutputReceiver<Long> outputReceiver)
throws Exception {
outputReceiver.output(row.getValue("int_field"));
}
}
}

0 comments on commit 3e472e2

Please sign in to comment.