Skip to content

Commit

Permalink
Restore "Default to Runner v2 for Python Streaming jobs. (apache#15140)"
Browse files Browse the repository at this point in the history
This reverts commit 034ccdf.
  • Loading branch information
robertwb committed Aug 5, 2021
1 parent f759a5c commit f7c7f26
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 3 deletions.
8 changes: 7 additions & 1 deletion sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,9 +594,15 @@ def run_pipeline(self, pipeline, options):
return result

def _maybe_add_unified_worker_missing_options(self, options):
debug_options = options.view_as(DebugOptions)
# Streaming is always portable, default to runner v2.
if options.view_as(StandardOptions).streaming:
if not debug_options.lookup_experiment('disable_runner_v2'):
debug_options.add_experiment('beam_fn_api')
debug_options.add_experiment('use_runner_v2')
debug_options.add_experiment('use_portable_job_submission')
# set default beam_fn_api experiment if use unified
# worker experiment flag exists, no-op otherwise.
debug_options = options.view_as(DebugOptions)
from apache_beam.runners.dataflow.internal import apiclient
if apiclient._use_unified_worker(options):
if not debug_options.lookup_experiment('beam_fn_api'):
Expand Down
11 changes: 9 additions & 2 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ def test_remote_runner_translation(self):
def test_streaming_create_translation(self):
remote_runner = DataflowRunner()
self.default_properties.append("--streaming")
self.default_properties.append("--experiments=disable_runner_v2")
with Pipeline(remote_runner, PipelineOptions(self.default_properties)) as p:
p | ptransform.Create([1]) # pylint: disable=expression-not-assigned
job_dict = json.loads(str(remote_runner.job))
Expand Down Expand Up @@ -839,15 +840,21 @@ def test_group_into_batches_translation_non_unified_worker(self):
'Runner determined sharding not available in Dataflow for '
'GroupIntoBatches for jobs not using Runner V2'):
_ = self._run_group_into_batches_and_get_step_properties(
True, ['--enable_streaming_engine'])
True,
['--enable_streaming_engine', '--experiments=disable_runner_v2'])

# JRH
with self.assertRaisesRegex(
ValueError,
'Runner determined sharding not available in Dataflow for '
'GroupIntoBatches for jobs not using Runner V2'):
_ = self._run_group_into_batches_and_get_step_properties(
True, ['--enable_streaming_engine', '--experiments=beam_fn_api'])
True,
[
'--enable_streaming_engine',
'--experiments=beam_fn_api',
'--experiments=disable_runner_v2'
])

def test_pack_combiners(self):
class PackableCombines(beam.PTransform):
Expand Down

0 comments on commit f7c7f26

Please sign in to comment.