Skip to content

Commit

Permalink
[#24515] Ensure that portable Java pipelines on Dataflow are not able…
Browse files Browse the repository at this point in the history
… to opt out of runner v2. (#24805)

Towards #24515
  • Loading branch information
lukecwik authored Dec 29, 2022
1 parent 208a14e commit 7c736e2
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 5 deletions.
9 changes: 5 additions & 4 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@

## Breaking Changes

* Go pipelines, Python streaming pipelines, and portable Python batch pipelines on Dataflow are required to
use Runner V2. The `disable_runner_v2`, `disable_runner_v2_until_2023`, `disable_prime_runner_v2`
experiments will raise an error during pipeline construction. You can no longer specify the Dataflow worker
jar override. Note that non-portable Python batch jobs are not impacted. ([#24515](https://github.com/apache/beam/issues/24515)).
* Portable Java pipelines, Go pipelines, Python streaming pipelines, and portable Python batch
pipelines on Dataflow are required to use Runner V2. The `disable_runner_v2`,
`disable_runner_v2_until_2023`, `disable_prime_runner_v2` experiments will raise an error during
pipeline construction. You can no longer specify the Dataflow worker jar override. Note that
non-portable Java jobs and non-portable Python batch jobs are not impacted. ([#24515](https://github.com/apache/beam/issues/24515)).

## Deprecations

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,12 @@ public DataflowPipelineJob run(Pipeline pipeline) {
}
}
if (useUnifiedWorker(options)) {
if (hasExperiment(options, "disable_runner_v2")
|| hasExperiment(options, "disable_runner_v2_until_2023")
|| hasExperiment(options, "disable_prime_runner_v2")) {
throw new IllegalArgumentException(
"Runner V2 both disabled and enabled: at least one of ['beam_fn_api', 'use_unified_worker', 'use_runner_v2', 'use_portable_job_submission'] is set and also one of ['disable_runner_v2', 'disable_runner_v2_until_2023', 'disable_prime_runner_v2'] is set.");
}
List<String> experiments =
new ArrayList<>(options.getExperiments()); // non-null if useUnifiedWorker is true
if (!experiments.contains("use_runner_v2")) {
Expand All @@ -1116,6 +1122,18 @@ public DataflowPipelineJob run(Pipeline pipeline) {
logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
if (shouldActAsStreaming(pipeline)) {
options.setStreaming(true);

if (useUnifiedWorker(options)) {
options.setEnableStreamingEngine(true);
List<String> experiments =
new ArrayList<>(options.getExperiments()); // non-null if useUnifiedWorker is true
if (!experiments.contains("enable_streaming_engine")) {
experiments.add("enable_streaming_engine");
}
if (!experiments.contains("enable_windmill_service")) {
experiments.add("enable_windmill_service");
}
}
}

if (!ExperimentalOptions.hasExperiment(options, "disable_projection_pushdown")) {
Expand Down Expand Up @@ -2412,7 +2430,8 @@ static String getDefaultContainerVersion(DataflowPipelineOptions options) {
static boolean useUnifiedWorker(DataflowPipelineOptions options) {
return hasExperiment(options, "beam_fn_api")
|| hasExperiment(options, "use_runner_v2")
|| hasExperiment(options, "use_unified_worker");
|| hasExperiment(options, "use_unified_worker")
|| hasExperiment(options, "use_portable_job_submission");
}

static boolean useStreamingEngine(DataflowPipelineOptions options) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1745,6 +1745,63 @@ public void testSdkHarnessConfigurationPrime() throws IOException {
this.verifySdkHarnessConfiguration(options);
}

@Test
public void testSettingAnyFnApiExperimentEnablesUnifiedWorker() throws Exception {
for (String experiment :
ImmutableList.of(
"beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission")) {
DataflowPipelineOptions options = buildPipelineOptions();
ExperimentalOptions.addExperiment(options, experiment);
Pipeline p = Pipeline.create(options);
p.apply(Create.of("A"));
p.run();
assertFalse(options.isEnableStreamingEngine());
assertThat(
options.getExperiments(),
containsInAnyOrder(
"beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission"));
}

for (String experiment :
ImmutableList.of(
"beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission")) {
DataflowPipelineOptions options = buildPipelineOptions();
options.setStreaming(true);
ExperimentalOptions.addExperiment(options, experiment);
Pipeline p = Pipeline.create(options);
p.apply(Create.of("A"));
p.run();
assertTrue(options.isEnableStreamingEngine());
assertThat(
options.getExperiments(),
containsInAnyOrder(
"beam_fn_api",
"use_runner_v2",
"use_unified_worker",
"use_portable_job_submission",
"enable_windmill_service",
"enable_streaming_engine"));
}
}

@Test
public void testSettingConflictingEnableAndDisableExperimentsThrowsException() throws Exception {
for (String experiment :
ImmutableList.of(
"beam_fn_api", "use_runner_v2", "use_unified_worker", "use_portable_job_submission")) {
for (String disabledExperiment :
ImmutableList.of(
"disable_runner_v2", "disable_runner_v2_until_2023", "disable_prime_runner_v2")) {
DataflowPipelineOptions options = buildPipelineOptions();
ExperimentalOptions.addExperiment(options, experiment);
ExperimentalOptions.addExperiment(options, disabledExperiment);
Pipeline p = Pipeline.create(options);
p.apply(Create.of("A"));
assertThrows("Runner V2 both disabled and enabled", IllegalArgumentException.class, p::run);
}
}
}

private void verifyMapStateUnsupported(PipelineOptions options) throws Exception {
Pipeline p = Pipeline.create(options);
p.apply(Create.of(KV.of(13, 42)))
Expand Down

0 comments on commit 7c736e2

Please sign in to comment.