Skip to content

Commit

Permalink
Merge pull request #30061: Automatically enable Dataflow Runner v2 fo…
Browse files Browse the repository at this point in the history
…r pipeliens that use the transform upgrade feature
  • Loading branch information
chamikaramj authored Jan 20, 2024
2 parents e4b8180 + 175346e commit c0cc7c4
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ public static RunnerApi.Pipeline toProto(
final Pipeline pipeline,
final SdkComponents components,
boolean useDeprecatedViewTransforms) {
return toProto(pipeline, components, useDeprecatedViewTransforms, true);
}

public static RunnerApi.Pipeline toProto(
final Pipeline pipeline,
final SdkComponents components,
boolean useDeprecatedViewTransforms,
boolean upgradeTransforms) {
final List<String> rootIds = new ArrayList<>();
pipeline.traverseTopologically(
new PipelineVisitor.Defaults() {
Expand Down Expand Up @@ -106,7 +114,7 @@ public void visitPrimitiveTransform(Node node) {
ExternalTranslationOptions externalTranslationOptions =
pipeline.getOptions().as(ExternalTranslationOptions.class);
List<String> urnsToOverride = externalTranslationOptions.getTransformsToOverride();
if (urnsToOverride.size() > 0) {
if (urnsToOverride.size() > 0 && upgradeTransforms) {
try (TransformUpgrader upgrader = TransformUpgrader.of()) {
res =
upgrader.upgradeTransformsViaTransformService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public class TransformUpgrader implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(TransformUpgrader.class);
private static final String UPGRADE_NAMESPACE = "transform:upgrade:";

@VisibleForTesting static final String UPGRADE_KEY = "upgraded_to_version";

private ExpansionServiceClientFactory clientFactory;

private TransformUpgrader() {
Expand Down Expand Up @@ -148,7 +150,8 @@ public RunnerApi.Pipeline upgradeTransformsViaTransformService(

for (String transformId : transformsToOverride) {
pipeline =
updateTransformViaTransformService(pipeline, transformId, expansionServiceEndpoint);
updateTransformViaTransformService(
pipeline, transformId, expansionServiceEndpoint, options);
}

if (service != null) {
Expand All @@ -165,14 +168,14 @@ public RunnerApi.Pipeline upgradeTransformsViaTransformService(
RunnerApi.Pipeline updateTransformViaTransformService(
RunnerApi.Pipeline runnerAPIpipeline,
String transformId,
Endpoints.ApiServiceDescriptor transformServiceEndpoint)
Endpoints.ApiServiceDescriptor transformServiceEndpoint,
ExternalTranslationOptions options)
throws IOException {
RunnerApi.PTransform transformToUpgrade =
runnerAPIpipeline.getComponents().getTransformsMap().get(transformId);
if (transformToUpgrade == null) {
throw new IllegalArgumentException("Could not find a transform with the ID " + transformId);
}

ByteString configRowBytes =
transformToUpgrade.getAnnotationsOrThrow(PTransformTranslation.CONFIG_ROW_KEY);
ByteString configRowSchemaBytes =
Expand Down Expand Up @@ -236,6 +239,17 @@ RunnerApi.Pipeline updateTransformViaTransformService(
newEnvironmentsWithDependencies, transformServiceEndpoint))
.build();
RunnerApi.PTransform expandedTransform = response.getTransform();

// Adds an annotation that denotes the Beam version the transform was upgraded to.
RunnerApi.PTransform.Builder expandedTransformBuilder = expandedTransform.toBuilder();
String transformServiceVersion = options.getTransformServiceBeamVersion();
if (transformServiceVersion == null || transformServiceVersion.isEmpty()) {
transformServiceVersion = "unknown";
}
expandedTransformBuilder.putAnnotations(
UPGRADE_KEY, ByteString.copyFromUtf8(transformServiceVersion));
expandedTransform = expandedTransformBuilder.build();

List<String> expandedRequirements = response.getRequirementsList();

RunnerApi.Components.Builder newComponentsBuilder = expandedComponents.toBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.beam.runners.core.construction;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.google.auto.service.AutoService;
import java.io.ByteArrayInputStream;
Expand Down Expand Up @@ -284,6 +285,9 @@ public void testTransformUpgrade() throws Exception {
.get("TransformUpgraderTest-TestTransform");

validateTestParam(upgradedTransform, 4);

// Confirm that the upgraded transform includes the upgrade annotation.
assertTrue(upgradedTransform.getAnnotationsMap().containsKey(TransformUpgrader.UPGRADE_KEY));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import org.apache.beam.runners.core.construction.EmptyFlattenAsCreateFactory;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.External;
import org.apache.beam.runners.core.construction.ExternalTranslationOptions;
import org.apache.beam.runners.core.construction.PTransformMatchers;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
Expand Down Expand Up @@ -1101,14 +1102,25 @@ public void visitPrimitiveTransform(Node node) {
return visitor.isMultiLanguage;
}

private static boolean includesTransformUpgrades(Pipeline pipeline) {
return (pipeline
.getOptions()
.as(ExternalTranslationOptions.class)
.getTransformsToOverride()
.size()
> 0);
}

@Override
public DataflowPipelineJob run(Pipeline pipeline) {
if (DataflowRunner.isMultiLanguagePipeline(pipeline)) {
// Multi-language pipelines and pipelines that include upgrades should automatically be upgraded
// to Runner v2.
if (DataflowRunner.isMultiLanguagePipeline(pipeline) || includesTransformUpgrades(pipeline)) {
List<String> experiments = firstNonNull(options.getExperiments(), Collections.emptyList());
if (!experiments.contains("use_runner_v2")) {
LOG.info(
"Automatically enabling Dataflow Runner v2 since the pipeline used cross-language"
+ " transforms");
+ " transforms or pipeline needed a transform upgrade.");
options.setExperiments(
ImmutableList.<String>builder().addAll(experiments).add("use_runner_v2").build());
}
Expand Down Expand Up @@ -1217,8 +1229,9 @@ public DataflowPipelineJob run(Pipeline pipeline) {
.addAllDependencies(getDefaultArtifacts())
.addAllCapabilities(Environments.getJavaCapabilities())
.build());
// No need to perform transform upgrading for the Runner v1 proto.
RunnerApi.Pipeline dataflowV1PipelineProto =
PipelineTranslation.toProto(pipeline, dataflowV1Components, true);
PipelineTranslation.toProto(pipeline, dataflowV1Components, true, false);

if (LOG.isDebugEnabled()) {
LOG.debug(
Expand Down

0 comments on commit c0cc7c4

Please sign in to comment.