Skip to content

Commit

Permalink
Revert "Default to Runner v2 for Python Streaming jobs. (#15140)" (#1…
Browse files Browse the repository at this point in the history
…5210) (#15212)

This reverts commit 09d4fab.
  • Loading branch information
angoenka authored Jul 28, 2021
1 parent 8e32f3b commit ad5d202
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 16 deletions.
8 changes: 1 addition & 7 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,15 +594,9 @@ def run_pipeline(self, pipeline, options):
return result

def _maybe_add_unified_worker_missing_options(self, options):
debug_options = options.view_as(DebugOptions)
# Streaming is always portable, default to runner v2.
if options.view_as(StandardOptions).streaming:
if not debug_options.lookup_experiment('disable_runner_v2'):
debug_options.add_experiment('beam_fn_api')
debug_options.add_experiment('use_runner_v2')
debug_options.add_experiment('use_portable_job_submission')
# set default beam_fn_api experiment if use unified
# worker experiment flag exists, no-op otherwise.
debug_options = options.view_as(DebugOptions)
from apache_beam.runners.dataflow.internal import apiclient
if apiclient._use_unified_worker(options):
if not debug_options.lookup_experiment('beam_fn_api'):
Expand Down
11 changes: 2 additions & 9 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ def test_remote_runner_translation(self):
def test_streaming_create_translation(self):
remote_runner = DataflowRunner()
self.default_properties.append("--streaming")
self.default_properties.append("--experiments=disable_runner_v2")
with Pipeline(remote_runner, PipelineOptions(self.default_properties)) as p:
p | ptransform.Create([1]) # pylint: disable=expression-not-assigned
job_dict = json.loads(str(remote_runner.job))
Expand Down Expand Up @@ -839,21 +838,15 @@ def test_group_into_batches_translation_non_unified_worker(self):
'Runner determined sharding not available in Dataflow for '
'GroupIntoBatches for jobs not using Runner V2'):
_ = self._run_group_into_batches_and_get_step_properties(
True,
['--enable_streaming_engine', '--experiments=disable_runner_v2'])
True, ['--enable_streaming_engine'])

# JRH
with self.assertRaisesRegex(
ValueError,
'Runner determined sharding not available in Dataflow for '
'GroupIntoBatches for jobs not using Runner V2'):
_ = self._run_group_into_batches_and_get_step_properties(
True,
[
'--enable_streaming_engine',
'--experiments=beam_fn_api',
'--experiments=disable_runner_v2'
])
True, ['--enable_streaming_engine', '--experiments=beam_fn_api'])

def test_pack_combiners(self):
class PackableCombines(beam.PTransform):
Expand Down

0 comments on commit ad5d202

Please sign in to comment.