diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java index e39a38a74c2c..4433b4b0475d 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PipelineTranslation.java @@ -56,6 +56,14 @@ public static RunnerApi.Pipeline toProto( final Pipeline pipeline, final SdkComponents components, boolean useDeprecatedViewTransforms) { + return toProto(pipeline, components, useDeprecatedViewTransforms, true); + } + + public static RunnerApi.Pipeline toProto( + final Pipeline pipeline, + final SdkComponents components, + boolean useDeprecatedViewTransforms, + boolean upgradeTransforms) { final List rootIds = new ArrayList<>(); pipeline.traverseTopologically( new PipelineVisitor.Defaults() { @@ -106,7 +114,7 @@ public void visitPrimitiveTransform(Node node) { ExternalTranslationOptions externalTranslationOptions = pipeline.getOptions().as(ExternalTranslationOptions.class); List urnsToOverride = externalTranslationOptions.getTransformsToOverride(); - if (urnsToOverride.size() > 0) { + if (urnsToOverride.size() > 0 && upgradeTransforms) { try (TransformUpgrader upgrader = TransformUpgrader.of()) { res = upgrader.upgradeTransformsViaTransformService( diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java index b142ab4af1c9..4f1a02165d23 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/TransformUpgrader.java @@ -62,6 +62,8 @@ public class TransformUpgrader implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(TransformUpgrader.class); private static final String UPGRADE_NAMESPACE = "transform:upgrade:"; + @VisibleForTesting static final String UPGRADE_KEY = "upgraded_to_version"; + private ExpansionServiceClientFactory clientFactory; private TransformUpgrader() { @@ -148,7 +150,8 @@ public RunnerApi.Pipeline upgradeTransformsViaTransformService( for (String transformId : transformsToOverride) { pipeline = - updateTransformViaTransformService(pipeline, transformId, expansionServiceEndpoint); + updateTransformViaTransformService( + pipeline, transformId, expansionServiceEndpoint, options); } if (service != null) { @@ -165,14 +168,14 @@ public RunnerApi.Pipeline upgradeTransformsViaTransformService( RunnerApi.Pipeline updateTransformViaTransformService( RunnerApi.Pipeline runnerAPIpipeline, String transformId, - Endpoints.ApiServiceDescriptor transformServiceEndpoint) + Endpoints.ApiServiceDescriptor transformServiceEndpoint, + ExternalTranslationOptions options) throws IOException { RunnerApi.PTransform transformToUpgrade = runnerAPIpipeline.getComponents().getTransformsMap().get(transformId); if (transformToUpgrade == null) { throw new IllegalArgumentException("Could not find a transform with the ID " + transformId); } - ByteString configRowBytes = transformToUpgrade.getAnnotationsOrThrow(PTransformTranslation.CONFIG_ROW_KEY); ByteString configRowSchemaBytes = @@ -236,6 +239,17 @@ RunnerApi.Pipeline updateTransformViaTransformService( newEnvironmentsWithDependencies, transformServiceEndpoint)) .build(); RunnerApi.PTransform expandedTransform = response.getTransform(); + + // Adds an annotation that denotes the Beam version the transform was upgraded to. + RunnerApi.PTransform.Builder expandedTransformBuilder = expandedTransform.toBuilder(); + String transformServiceVersion = options.getTransformServiceBeamVersion(); + if (transformServiceVersion == null || transformServiceVersion.isEmpty()) { + transformServiceVersion = "unknown"; + } + expandedTransformBuilder.putAnnotations( + UPGRADE_KEY, ByteString.copyFromUtf8(transformServiceVersion)); + expandedTransform = expandedTransformBuilder.build(); + List expandedRequirements = response.getRequirementsList(); RunnerApi.Components.Builder newComponentsBuilder = expandedComponents.toBuilder(); diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformUpgraderTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformUpgraderTest.java index 6620e780bc16..e14fa556dd90 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformUpgraderTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/TransformUpgraderTest.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.core.construction; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import com.google.auto.service.AutoService; import java.io.ByteArrayInputStream; @@ -284,6 +285,9 @@ public void testTransformUpgrade() throws Exception { .get("TransformUpgraderTest-TestTransform"); validateTestParam(upgradedTransform, 4); + + // Confirm that the upgraded transform includes the upgrade annotation. + assertTrue(upgradedTransform.getAnnotationsMap().containsKey(TransformUpgrader.UPGRADE_KEY)); } @Test diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index ea7a7d9e7314..725903aaaa27 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -64,6 +64,7 @@ import org.apache.beam.runners.core.construction.EmptyFlattenAsCreateFactory; import org.apache.beam.runners.core.construction.Environments; import org.apache.beam.runners.core.construction.External; +import org.apache.beam.runners.core.construction.ExternalTranslationOptions; import org.apache.beam.runners.core.construction.PTransformMatchers; import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; @@ -1101,14 +1102,25 @@ public void visitPrimitiveTransform(Node node) { return visitor.isMultiLanguage; } + private static boolean includesTransformUpgrades(Pipeline pipeline) { + return (pipeline + .getOptions() + .as(ExternalTranslationOptions.class) + .getTransformsToOverride() + .size() + > 0); + } + @Override public DataflowPipelineJob run(Pipeline pipeline) { - if (DataflowRunner.isMultiLanguagePipeline(pipeline)) { + // Multi-language pipelines and pipelines that include upgrades should automatically be upgraded + // to Runner v2. + if (DataflowRunner.isMultiLanguagePipeline(pipeline) || includesTransformUpgrades(pipeline)) { List experiments = firstNonNull(options.getExperiments(), Collections.emptyList()); if (!experiments.contains("use_runner_v2")) { LOG.info( "Automatically enabling Dataflow Runner v2 since the pipeline used cross-language" - + " transforms"); + + " transforms or pipeline needed a transform upgrade."); options.setExperiments( ImmutableList.builder().addAll(experiments).add("use_runner_v2").build()); } @@ -1217,8 +1229,9 @@ public DataflowPipelineJob run(Pipeline pipeline) { .addAllDependencies(getDefaultArtifacts()) .addAllCapabilities(Environments.getJavaCapabilities()) .build()); + // No need to perform transform upgrading for the Runner v1 proto. RunnerApi.Pipeline dataflowV1PipelineProto = - PipelineTranslation.toProto(pipeline, dataflowV1Components, true); + PipelineTranslation.toProto(pipeline, dataflowV1Components, true, false); if (LOG.isDebugEnabled()) { LOG.debug(