Skip to content

Commit

Permalink
[BEAM-12976] Enable projection pushdown for Java pipelines on Dataflo…
Browse files Browse the repository at this point in the history
…w, Flink, and Spark.
  • Loading branch information
ibzib committed Mar 17, 2022
1 parent b88bed5 commit eb5fbd3
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
* New highly anticipated feature Y added to Java SDK ([BEAM-Y](https://issues.apache.org/jira/browse/BEAM-Y)).

## I/Os
* Introduce projection pushdown optimizer to the Java SDK ([BEAM-12976](https://issues.apache.org/jira/browse/BEAM-12976)). The optimizer currently only works on the [BigQuery Storage API](https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-api), but more I/Os will be added in future releases. If you encounter a bug with the optimizer, please file a JIRA and disable the optimizer using pipeline option `--experiments=disable_projection_pushdown`.

* Support for X source added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
* `amazon-web-services2` has reached feature parity and is finally recommended over the earlier `amazon-web-services` and `kinesis` modules (Java). These will be deprecated in one of the next releases ([BEAM-13174](https://issues.apache.org/jira/browse/BEAM-13174)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.core.construction.graph.ProjectionPushdownOptimizer;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
Expand Down Expand Up @@ -81,6 +82,11 @@ public PipelineResult run(Pipeline pipeline) {
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
}

if (!options.isStreaming()
&& !ExperimentalOptions.hasExperiment(options, "disable_projection_pushdown")) {
ProjectionPushdownOptimizer.optimize(pipeline);
}

logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);

MetricsEnvironment.setMetricsSupported(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource;
import org.apache.beam.runners.core.construction.UnconsumedReads;
import org.apache.beam.runners.core.construction.WriteFilesTranslation;
import org.apache.beam.runners.core.construction.graph.ProjectionPushdownOptimizer;
import org.apache.beam.runners.dataflow.DataflowPipelineTranslator.JobSpecification;
import org.apache.beam.runners.dataflow.StreamingViewOverrides.StreamingCreatePCollectionViewFactory;
import org.apache.beam.runners.dataflow.TransformTranslator.StepTranslationContext;
Expand Down Expand Up @@ -113,6 +114,7 @@
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsValidator;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
Expand Down Expand Up @@ -1025,7 +1027,6 @@ private List<RunnerApi.ArtifactInformation> getDefaultArtifacts() {

@Override
public DataflowPipelineJob run(Pipeline pipeline) {

if (useUnifiedWorker(options)) {
List<String> experiments = options.getExperiments(); // non-null if useUnifiedWorker is true
if (!experiments.contains("use_runner_v2")) {
Expand All @@ -1048,6 +1049,11 @@ public DataflowPipelineJob run(Pipeline pipeline) {
options.setStreaming(true);
}

if (!options.isStreaming()
&& !ExperimentalOptions.hasExperiment(options, "disable_projection_pushdown")) {
ProjectionPushdownOptimizer.optimize(pipeline);
}

LOG.info(
"Executing pipeline on the Dataflow Service, which will have billing implications "
+ "related to Google Compute Engine usage and other Google Cloud Services.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.concurrent.Future;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.core.construction.TransformInputs;
import org.apache.beam.runners.core.construction.graph.ProjectionPushdownOptimizer;
import org.apache.beam.runners.core.metrics.MetricsPusher;
import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
import org.apache.beam.runners.spark.metrics.AggregatorMetricSource;
Expand Down Expand Up @@ -160,6 +161,11 @@ public SparkPipelineResult run(final Pipeline pipeline) {
SplittableParDo.convertReadBasedSplittableDoFnsToPrimitiveReadsIfNecessary(pipeline);
}

if (!pipelineOptions.isStreaming()
&& !ExperimentalOptions.hasExperiment(pipelineOptions, "disable_projection_pushdown")) {
ProjectionPushdownOptimizer.optimize(pipeline);
}

pipeline.replaceAll(SparkTransformOverrides.getDefaultOverrides(pipelineOptions.isStreaming()));

prepareFilesToStage(pipelineOptions);
Expand Down

0 comments on commit eb5fbd3

Please sign in to comment.