From ca674aad086e4d1a40e8c01598073030a22484fa Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Mon, 10 Jul 2023 16:31:55 -0700 Subject: [PATCH] Remove unneeded Dataflow Runner v1 code. (#27196) --- sdks/python/apache_beam/coders/coders.py | 133 --- .../apache_beam/coders/coders_test_common.py | 41 - sdks/python/apache_beam/coders/row_coder.py | 9 - .../apache_beam/coders/row_coder_test.py | 11 - .../runners/dataflow/dataflow_runner.py | 828 +----------------- .../runners/dataflow/dataflow_runner_test.py | 369 +------- .../runners/dataflow/internal/apiclient.py | 95 +- .../dataflow/internal/apiclient_test.py | 53 -- .../runners/dataflow/internal/names.py | 76 -- .../runners/dataflow/ptransform_overrides.py | 126 --- 10 files changed, 49 insertions(+), 1692 deletions(-) diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index d4ca99b80fb3..7c5c8e09303d 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -280,29 +280,6 @@ def _get_component_coders(self): # refined in user-defined Coders. return [] - def as_cloud_object(self, coders_context=None): - """For internal use only; no backwards-compatibility guarantees. - - Returns Google Cloud Dataflow API description of this coder.""" - # This is an internal detail of the Coder API and does not need to be - # refined in user-defined Coders. - - value = { - # We pass coders in the form "$" to make the - # job description JSON more readable. Data before the $ is ignored by - # the worker. - '@type': serialize_coder(self), - 'component_encodings': [ - component.as_cloud_object(coders_context) - for component in self._get_component_coders() - ], - } - - if coders_context: - value['pipeline_proto_coder_id'] = coders_context.get_id(self) - - return value - def __repr__(self): return self.__class__.__name__ @@ -493,11 +470,6 @@ def is_deterministic(self): def to_type_hint(self): return bytes - def as_cloud_object(self, coders_context=None): - return { - '@type': 'kind:bytes', - } - def __eq__(self, other): return type(self) == type(other) @@ -667,11 +639,6 @@ def is_deterministic(self): def to_type_hint(self): return int - def as_cloud_object(self, coders_context=None): - return { - '@type': 'kind:varint', - } - def __eq__(self, other): return type(self) == type(other) @@ -846,21 +813,6 @@ def is_deterministic(self): # GroupByKey operations. return False - def as_cloud_object(self, coders_context=None, is_pair_like=True): - value = super().as_cloud_object(coders_context) - # We currently use this coder in places where we cannot infer the coder to - # use for the value type in a more granular way. In places where the - # service expects a pair, it checks for the "is_pair_like" key, in which - # case we would fail without the hack below. - if is_pair_like: - value['is_pair_like'] = True - value['component_encodings'] = [ - self.as_cloud_object(coders_context, is_pair_like=False), - self.as_cloud_object(coders_context, is_pair_like=False) - ] - - return value - # We allow .key_coder() and .value_coder() to be called on PickleCoder since # we can't always infer the return values of lambdas in ParDo operations, the # result of which may be used in a GroupBykey. @@ -983,21 +935,6 @@ def as_deterministic_coder(self, step_label, error_message=None): def to_type_hint(self): return Any - def as_cloud_object(self, coders_context=None, is_pair_like=True): - value = super().as_cloud_object(coders_context) - # We currently use this coder in places where we cannot infer the coder to - # use for the value type in a more granular way. In places where the - # service expects a pair, it checks for the "is_pair_like" key, in which - # case we would fail without the hack below. - if is_pair_like: - value['is_pair_like'] = True - value['component_encodings'] = [ - self.as_cloud_object(coders_context, is_pair_like=False), - self.as_cloud_object(coders_context, is_pair_like=False) - ] - - return value - # We allow .key_coder() and .value_coder() to be called on FastPrimitivesCoder # since we can't always infer the return values of lambdas in ParDo # operations, the result of which may be used in a GroupBykey. @@ -1231,19 +1168,6 @@ def from_type_hint(cls, typehint, registry): # type: (typehints.TupleConstraint, CoderRegistry) -> TupleCoder return cls([registry.get_coder(t) for t in typehint.tuple_types]) - def as_cloud_object(self, coders_context=None): - if self.is_kv_coder(): - return { - '@type': 'kind:pair', - 'is_pair_like': True, - 'component_encodings': [ - component.as_cloud_object(coders_context) - for component in self._get_component_coders() - ], - } - - return super().as_cloud_object(coders_context) - def _get_component_coders(self): # type: () -> Tuple[Coder, ...] return self.coders() @@ -1353,15 +1277,6 @@ def as_deterministic_coder(self, step_label, error_message=None): return type(self)( self._elem_coder.as_deterministic_coder(step_label, error_message)) - def as_cloud_object(self, coders_context=None): - return { - '@type': 'kind:stream', - 'is_stream_like': True, - 'component_encodings': [ - self._elem_coder.as_cloud_object(coders_context) - ], - } - def value_coder(self): return self._elem_coder @@ -1409,11 +1324,6 @@ def __init__(self): from apache_beam.transforms import window super().__init__(window.GlobalWindow()) - def as_cloud_object(self, coders_context=None): - return { - '@type': 'kind:global_window', - } - Coder.register_structured_urn( common_urns.coders.GLOBAL_WINDOW.urn, GlobalWindowCoder) @@ -1428,11 +1338,6 @@ def is_deterministic(self): # type: () -> bool return True - def as_cloud_object(self, coders_context=None): - return { - '@type': 'kind:interval_window', - } - def __eq__(self, other): return type(self) == type(other) @@ -1466,16 +1371,6 @@ def is_deterministic(self): c.is_deterministic() for c in [self.wrapped_value_coder, self.timestamp_coder, self.window_coder]) - def as_cloud_object(self, coders_context=None): - return { - '@type': 'kind:windowed_value', - 'is_wrapper': True, - 'component_encodings': [ - component.as_cloud_object(coders_context) - for component in self._get_component_coders() - ], - } - def _get_component_coders(self): # type: () -> List[Coder] return [self.wrapped_value_coder, self.window_coder] @@ -1527,10 +1422,6 @@ def is_deterministic(self): # type: () -> bool return self.wrapped_value_coder.is_deterministic() - def as_cloud_object(self, coders_context=None): - raise NotImplementedError( - "as_cloud_object not supported for ParamWindowedValueCoder") - def __repr__(self): return 'ParamWindowedValueCoder[%s]' % self.wrapped_value_coder @@ -1577,14 +1468,6 @@ def estimate_size(self, value): def value_coder(self): return self._value_coder - def as_cloud_object(self, coders_context=None): - return { - '@type': 'kind:length_prefix', - 'component_encodings': [ - self._value_coder.as_cloud_object(coders_context) - ], - } - def _get_component_coders(self): # type: () -> Tuple[Coder, ...] return (self._value_coder, ) @@ -1680,14 +1563,6 @@ def is_deterministic(self): # type: () -> bool return self._key_coder.is_deterministic() - def as_cloud_object(self, coders_context=None): - return { - '@type': 'kind:sharded_key', - 'component_encodings': [ - self._key_coder.as_cloud_object(coders_context) - ], - } - def to_type_hint(self): from apache_beam.typehints import sharded_key_type return sharded_key_type.ShardedKeyTypeConstraint( @@ -1738,14 +1613,6 @@ def _get_component_coders(self) -> List[Coder]: def is_deterministic(self) -> bool: return self._window_coder.is_deterministic() - def as_cloud_object(self, coders_context=None): - return { - '@type': 'kind:custom_window', - 'component_encodings': [ - self._window_coder.as_cloud_object(coders_context) - ], - } - def __repr__(self): return 'TimestampPrefixingWindowCoder[%r]' % self._window_coder diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 8b6674aebec7..70582e7992a6 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -387,16 +387,6 @@ def test_timer_coder(self): def test_tuple_coder(self): kv_coder = coders.TupleCoder((coders.VarIntCoder(), coders.BytesCoder())) - # Verify cloud object representation - self.assertEqual({ - '@type': 'kind:pair', - 'is_pair_like': True, - 'component_encodings': [ - coders.VarIntCoder().as_cloud_object(), - coders.BytesCoder().as_cloud_object() - ], - }, - kv_coder.as_cloud_object()) # Test binary representation self.assertEqual(b'\x04abc', kv_coder.encode((4, b'abc'))) # Test unnested @@ -424,13 +414,6 @@ def test_utf8_coder(self): def test_iterable_coder(self): iterable_coder = coders.IterableCoder(coders.VarIntCoder()) - # Verify cloud object representation - self.assertEqual({ - '@type': 'kind:stream', - 'is_stream_like': True, - 'component_encodings': [coders.VarIntCoder().as_cloud_object()] - }, - iterable_coder.as_cloud_object()) # Test unnested self.check_coder(iterable_coder, [1], [-1, 0, 100]) # Test nested @@ -507,16 +490,6 @@ def test_windowedvalue_coder_paneinfo(self): def test_windowed_value_coder(self): coder = coders.WindowedValueCoder( coders.VarIntCoder(), coders.GlobalWindowCoder()) - # Verify cloud object representation - self.assertEqual({ - '@type': 'kind:windowed_value', - 'is_wrapper': True, - 'component_encodings': [ - coders.VarIntCoder().as_cloud_object(), - coders.GlobalWindowCoder().as_cloud_object(), - ], - }, - coder.as_cloud_object()) # Test binary representation self.assertEqual( b'\x7f\xdf;dZ\x1c\xac\t\x00\x00\x00\x01\x0f\x01', @@ -618,8 +591,6 @@ def test_proto_coder(self): def test_global_window_coder(self): coder = coders.GlobalWindowCoder() value = window.GlobalWindow() - # Verify cloud object representation - self.assertEqual({'@type': 'kind:global_window'}, coder.as_cloud_object()) # Test binary representation self.assertEqual(b'', coder.encode(value)) self.assertEqual(value, coder.decode(b'')) @@ -630,12 +601,6 @@ def test_global_window_coder(self): def test_length_prefix_coder(self): coder = coders.LengthPrefixCoder(coders.BytesCoder()) - # Verify cloud object representation - self.assertEqual({ - '@type': 'kind:length_prefix', - 'component_encodings': [coders.BytesCoder().as_cloud_object()] - }, - coder.as_cloud_object()) # Test binary representation self.assertEqual(b'\x00', coder.encode(b'')) self.assertEqual(b'\x01a', coder.encode(b'a')) @@ -725,12 +690,6 @@ def test_sharded_key_coder(self): for key, bytes_repr, key_coder in key_and_coders: coder = coders.ShardedKeyCoder(key_coder) - # Verify cloud object representation - self.assertEqual({ - '@type': 'kind:sharded_key', - 'component_encodings': [key_coder.as_cloud_object()] - }, - coder.as_cloud_object()) # Test str repr self.assertEqual('%s' % coder, 'ShardedKeyCoder[%s]' % key_coder) diff --git a/sdks/python/apache_beam/coders/row_coder.py b/sdks/python/apache_beam/coders/row_coder.py index 19424fa1f12b..7765ccebc26f 100644 --- a/sdks/python/apache_beam/coders/row_coder.py +++ b/sdks/python/apache_beam/coders/row_coder.py @@ -17,8 +17,6 @@ # pytype: skip-file -from google.protobuf import json_format - from apache_beam.coders import typecoders from apache_beam.coders.coder_impl import LogicalTypeCoderImpl from apache_beam.coders.coder_impl import RowCoderImpl @@ -91,13 +89,6 @@ def as_deterministic_coder(self, step_label, error_message=None): def to_type_hint(self): return self._type_hint - def as_cloud_object(self, coders_context=None): - value = super().as_cloud_object(coders_context) - - value['schema'] = json_format.MessageToJson(self.schema).encode('utf-8') - - return value - def __hash__(self): return hash(self.schema.SerializeToString()) diff --git a/sdks/python/apache_beam/coders/row_coder_test.py b/sdks/python/apache_beam/coders/row_coder_test.py index dbca3e7f69c9..6ac982835cb3 100644 --- a/sdks/python/apache_beam/coders/row_coder_test.py +++ b/sdks/python/apache_beam/coders/row_coder_test.py @@ -22,7 +22,6 @@ from itertools import chain import numpy as np -from google.protobuf import json_format from numpy.testing import assert_array_equal import apache_beam as beam @@ -398,16 +397,6 @@ def test_row_coder_fail_early_bad_schema(self): self.assertRaisesRegex( ValueError, "type_with_no_typeinfo", lambda: RowCoder(schema_proto)) - def test_row_coder_cloud_object_schema(self): - schema_proto = schema_pb2.Schema(id='some-cloud-object-schema') - schema_proto_json = json_format.MessageToJson(schema_proto).encode('utf-8') - - coder = RowCoder(schema_proto) - - cloud_object = coder.as_cloud_object() - - self.assertEqual(schema_proto_json, cloud_object['schema']) - def test_batch_encode_decode(self): coder = RowCoder(typing_to_runner_api(Person).row_type.schema).get_impl() seq_out = coder_impl.create_OutputStream() diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 674d05c64ec8..315dc8ff7001 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -22,26 +22,18 @@ """ # pytype: skip-file -import base64 import logging import os import threading import time -import traceback import warnings from collections import defaultdict from subprocess import DEVNULL from typing import TYPE_CHECKING from typing import List -from urllib.parse import quote -from urllib.parse import quote_from_bytes -from urllib.parse import unquote_to_bytes import apache_beam as beam from apache_beam import coders -from apache_beam import error -from apache_beam.internal import pickler -from apache_beam.internal.gcp import json_value from apache_beam.options.pipeline_options import DebugOptions from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import SetupOptions @@ -51,23 +43,14 @@ from apache_beam.options.pipeline_options import WorkerOptions from apache_beam.portability import common_urns from apache_beam.portability.api import beam_runner_api_pb2 -from apache_beam.pvalue import AsSideInput from apache_beam.runners.common import DoFnSignature from apache_beam.runners.common import group_by_key_input_visitor -from apache_beam.runners.dataflow.internal import names from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api -from apache_beam.runners.dataflow.internal.names import PropertyNames -from apache_beam.runners.dataflow.internal.names import TransformNames from apache_beam.runners.runner import PipelineResult from apache_beam.runners.runner import PipelineRunner from apache_beam.runners.runner import PipelineState -from apache_beam.runners.runner import PValueCache -from apache_beam.transforms import window -from apache_beam.transforms.display import DisplayData -from apache_beam.transforms.sideinputs import SIDE_INPUT_PREFIX from apache_beam.typehints import typehints from apache_beam.utils import processes -from apache_beam.utils import proto_utils from apache_beam.utils.interactive_utils import is_in_notebook from apache_beam.utils.plugin import BeamPlugin @@ -103,9 +86,6 @@ class DataflowRunner(PipelineRunner): # Imported here to avoid circular dependencies. # TODO: Remove the apache_beam.pipeline dependency in CreatePTransformOverride - from apache_beam.runners.dataflow.ptransform_overrides import CombineValuesPTransformOverride - from apache_beam.runners.dataflow.ptransform_overrides import CreatePTransformOverride - from apache_beam.runners.dataflow.ptransform_overrides import ReadPTransformOverride from apache_beam.runners.dataflow.ptransform_overrides import NativeReadPTransformOverride # These overrides should be applied before the proto representation of the @@ -114,19 +94,7 @@ class DataflowRunner(PipelineRunner): NativeReadPTransformOverride(), ] # type: List[PTransformOverride] - # These overrides should be applied after the proto representation of the - # graph is created. - _NON_PORTABLE_PTRANSFORM_OVERRIDES = [ - CombineValuesPTransformOverride(), - CreatePTransformOverride(), - ReadPTransformOverride(), - ] # type: List[PTransformOverride] - def __init__(self, cache=None): - # Cache of CloudWorkflowStep protos generated while the runner - # "executes" a pipeline. - self._cache = cache if cache is not None else PValueCache() - self._unique_step_id = 0 self._default_environment = None def is_fnapi_compatible(self): @@ -136,10 +104,6 @@ def apply(self, transform, input, options): _check_and_add_missing_options(options) return super().apply(transform, input, options) - def _get_unique_step_name(self): - self._unique_step_id += 1 - return 's%s' % self._unique_step_id - @staticmethod def poll_for_job_completion( runner, result, duration, state_update_callback=None): @@ -262,7 +226,7 @@ def _only_element(iterable): return element @staticmethod - def side_input_visitor(is_runner_v2=False, deterministic_key_coders=True): + def side_input_visitor(deterministic_key_coders=True): # Imported here to avoid circular dependencies. # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.pipeline import PipelineVisitor @@ -300,9 +264,8 @@ def visit_transform(self, transform_node): 'Unsupported access pattern for %r: %r' % (transform_node.full_label, access_pattern)) new_side_inputs.append(new_side_input) - if is_runner_v2: - transform_node.side_inputs = new_side_inputs - transform_node.transform.side_inputs = new_side_inputs + transform_node.side_inputs = new_side_inputs + transform_node.transform.side_inputs = new_side_inputs return SideInputVisitor() @@ -363,20 +326,12 @@ def _adjust_pipeline_for_dataflow_v2(self, pipeline): not pipeline._options.view_as( TypeOptions).allow_non_deterministic_key_coders)) - def _check_for_unsupported_features_on_non_portable_worker(self, pipeline): - pipeline.visit(self.combinefn_visitor()) - def run_pipeline(self, pipeline, options, pipeline_proto=None): """Remotely executes entire pipeline or parts reachable from node.""" if _is_runner_v2_disabled(options): - debug_options = options.view_as(DebugOptions) - if not debug_options.lookup_experiment('disable_runner_v2_until_v2.50'): - raise ValueError( - 'disable_runner_v2 is deprecated in Beam Python ' + - beam.version.__version__ + - ' and this execution mode will be removed in a future Beam SDK. ' - 'If needed, please use: ' - '"--experiments=disable_runner_v2_until_v2.50".') + raise ValueError( + 'Disabling Runner V2 no longer supported ' + 'using Beam Python %s.' % beam.version.__version__) # Label goog-dataflow-notebook if job is started from notebook. if is_in_notebook(): @@ -397,26 +352,12 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None): 'Google Cloud Dataflow runner not available, ' 'please install apache_beam[gcp]') - if pipeline_proto or pipeline.contains_external_transforms: - if _is_runner_v2_disabled(options): - raise ValueError( - 'This pipeline contains cross language transforms, ' - 'which requires Runner V2.') - if not _is_runner_v2(options): - _LOGGER.info( - 'Automatically enabling Dataflow Runner V2 since the ' - 'pipeline used cross-language transforms.') - _add_runner_v2_missing_options(options) - - is_runner_v2 = _is_runner_v2(options) - if not is_runner_v2: - self._check_for_unsupported_features_on_non_portable_worker(pipeline) - # Convert all side inputs into a form acceptable to Dataflow. if pipeline: + pipeline.visit(self.combinefn_visitor()) + pipeline.visit( self.side_input_visitor( - _is_runner_v2(options), deterministic_key_coders=not options.view_as( TypeOptions).allow_non_deterministic_key_coders)) @@ -430,10 +371,6 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None): "Native sinks no longer implemented; " "ignoring use_legacy_bq_sink.") - from apache_beam.runners.dataflow.ptransform_overrides import GroupIntoBatchesWithShardedKeyPTransformOverride - pipeline.replace_all( - [GroupIntoBatchesWithShardedKeyPTransformOverride(self, options)]) - if pipeline_proto: self.proto_pipeline = pipeline_proto @@ -449,7 +386,7 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None): self._default_environment.container_image) else: artifacts = environments.python_sdk_dependencies(options) - if artifacts and _is_runner_v2(options): + if artifacts: _LOGGER.info( "Pipeline has additional dependencies to be installed " "in SDK worker container, consider using the SDK " @@ -501,11 +438,6 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None): known_runner_urns=frozenset(), partial=True) - if not is_runner_v2: - # Performing configured PTransform overrides which should not be reflected - # in the proto representation of the graph. - pipeline.replace_all(DataflowRunner._NON_PORTABLE_PTRANSFORM_OVERRIDES) - # Add setup_options for all the BeamPlugin imports setup_options = options.view_as(SetupOptions) plugins = BeamPlugin.get_all_plugin_paths() @@ -523,16 +455,6 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None): self.job = apiclient.Job(options, self.proto_pipeline) - # TODO: Consider skipping these for all use_portable_job_submission jobs. - if pipeline: - # Dataflow Runner v1 requires output type of the Flatten to be the same as - # the inputs, hence we enforce that here. Dataflow Runner v2 does not - # require this. - pipeline.visit(self.flatten_input_visitor()) - - # Trigger a traversal of all reachable nodes. - self.visit_transforms(pipeline, options) - test_options = options.view_as(TestOptions) # If it is a dry run, return without submitting the job. if test_options.dry_run: @@ -557,11 +479,6 @@ def run_pipeline(self, pipeline, options, pipeline_proto=None): result.metric_results = self._metrics return result - def _get_typehint_based_encoding(self, typehint, window_coder): - """Returns an encoding based on a typehint object.""" - return self._get_cloud_encoding( - self._get_coder(typehint, window_coder=window_coder)) - @staticmethod def _get_coder(typehint, window_coder): """Returns a coder based on a typehint object.""" @@ -570,197 +487,6 @@ def _get_coder(typehint, window_coder): coders.registry.get_coder(typehint), window_coder=window_coder) return coders.registry.get_coder(typehint) - def _get_cloud_encoding(self, coder, unused=None): - """Returns an encoding based on a coder object.""" - if not isinstance(coder, coders.Coder): - raise TypeError( - 'Coder object must inherit from coders.Coder: %s.' % str(coder)) - return coder.as_cloud_object(self.proto_context.coders) - - def _get_side_input_encoding(self, input_encoding): - """Returns an encoding for the output of a view transform. - - Args: - input_encoding: encoding of current transform's input. Side inputs need - this because the service will check that input and output types match. - - Returns: - An encoding that matches the output and input encoding. This is essential - for the View transforms introduced to produce side inputs to a ParDo. - """ - return { - '@type': 'kind:stream', - 'component_encodings': [input_encoding], - 'is_stream_like': { - 'value': True - }, - } - - def _get_encoded_output_coder( - self, transform_node, window_value=True, output_tag=None): - """Returns the cloud encoding of the coder for the output of a transform.""" - - if output_tag in transform_node.outputs: - element_type = transform_node.outputs[output_tag].element_type - elif len(transform_node.outputs) == 1: - output_tag = DataflowRunner._only_element(transform_node.outputs.keys()) - # TODO(robertwb): Handle type hints for multi-output transforms. - element_type = transform_node.outputs[output_tag].element_type - - else: - # TODO(silviuc): Remove this branch (and assert) when typehints are - # propagated everywhere. Returning an 'Any' as type hint will trigger - # usage of the fallback coder (i.e., cPickler). - element_type = typehints.Any - if window_value: - # All outputs have the same windowing. So getting the coder from an - # arbitrary window is fine. - output_tag = next(iter(transform_node.outputs.keys())) - window_coder = ( - transform_node.outputs[output_tag].windowing.windowfn. - get_window_coder()) - else: - window_coder = None - return self._get_typehint_based_encoding(element_type, window_coder) - - def get_pcoll_with_auto_sharding(self): - if not hasattr(self, '_pcoll_with_auto_sharding'): - return set() - return self._pcoll_with_auto_sharding - - def add_pcoll_with_auto_sharding(self, applied_ptransform): - if not hasattr(self, '_pcoll_with_auto_sharding'): - self.__setattr__('_pcoll_with_auto_sharding', set()) - output = DataflowRunner._only_element(applied_ptransform.outputs.keys()) - self._pcoll_with_auto_sharding.add( - applied_ptransform.outputs[output]._unique_name()) - - def _add_step(self, step_kind, step_label, transform_node, side_tags=()): - """Creates a Step object and adds it to the cache.""" - # Import here to avoid adding the dependency for local running scenarios. - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.runners.dataflow.internal import apiclient - step = apiclient.Step(step_kind, self._get_unique_step_name()) - self.job.proto.steps.append(step.proto) - step.add_property(PropertyNames.USER_NAME, step_label) - # Cache the node/step association for the main output of the transform node. - - # External transforms may not use 'None' as an output tag. - output_tags = ([None] + - list(side_tags) if None in transform_node.outputs.keys() else - list(transform_node.outputs.keys())) - - # We have to cache output for all tags since some transforms may produce - # multiple outputs. - for output_tag in output_tags: - self._cache.cache_output(transform_node, output_tag, step) - - # Finally, we add the display data items to the pipeline step. - # If the transform contains no display data then an empty list is added. - step.add_property( - PropertyNames.DISPLAY_DATA, - [ - item.get_dict() - for item in DisplayData.create_from(transform_node.transform).items - ]) - - if transform_node.resource_hints: - step.add_property( - PropertyNames.RESOURCE_HINTS, - { - hint: quote_from_bytes(value) - for (hint, value) in transform_node.resource_hints.items() - }) - - return step - - def _add_singleton_step( - self, - label, - full_label, - tag, - input_step, - windowing_strategy, - access_pattern): - """Creates a CollectionToSingleton step used to handle ParDo side inputs.""" - # Import here to avoid adding the dependency for local running scenarios. - from apache_beam.runners.dataflow.internal import apiclient - step = apiclient.Step(TransformNames.COLLECTION_TO_SINGLETON, label) - self.job.proto.steps.append(step.proto) - step.add_property(PropertyNames.USER_NAME, full_label) - step.add_property( - PropertyNames.PARALLEL_INPUT, - { - '@type': 'OutputReference', - PropertyNames.STEP_NAME: input_step.proto.name, - PropertyNames.OUTPUT_NAME: input_step.get_output(tag) - }) - step.encoding = self._get_side_input_encoding(input_step.encoding) - - output_info = { - PropertyNames.USER_NAME: '%s.%s' % (full_label, PropertyNames.OUTPUT), - PropertyNames.ENCODING: step.encoding, - PropertyNames.OUTPUT_NAME: PropertyNames.OUT - } - if common_urns.side_inputs.MULTIMAP.urn == access_pattern: - output_info[PropertyNames.USE_INDEXED_FORMAT] = True - step.add_property(PropertyNames.OUTPUT_INFO, [output_info]) - - step.add_property( - PropertyNames.WINDOWING_STRATEGY, - self.serialize_windowing_strategy( - windowing_strategy, self._default_environment)) - return step - - def run_Impulse(self, transform_node, options): - step = self._add_step( - TransformNames.READ, transform_node.full_label, transform_node) - step.add_property(PropertyNames.FORMAT, 'impulse') - encoded_impulse_element = coders.WindowedValueCoder( - coders.BytesCoder(), - coders.coders.GlobalWindowCoder()).get_impl().encode_nested( - window.GlobalWindows.windowed_value(b'')) - if _is_runner_v2(options): - encoded_impulse_as_str = self.byte_array_to_json_string( - encoded_impulse_element) - else: - encoded_impulse_as_str = base64.b64encode(encoded_impulse_element).decode( - 'ascii') - - step.add_property(PropertyNames.IMPULSE_ELEMENT, encoded_impulse_as_str) - - step.encoding = self._get_encoded_output_coder(transform_node) - step.add_property( - PropertyNames.OUTPUT_INFO, - [{ - PropertyNames.USER_NAME: ( - '%s.%s' % (transform_node.full_label, PropertyNames.OUT)), - PropertyNames.ENCODING: step.encoding, - PropertyNames.OUTPUT_NAME: PropertyNames.OUT - }]) - - def run_Flatten(self, transform_node, options): - step = self._add_step( - TransformNames.FLATTEN, transform_node.full_label, transform_node) - inputs = [] - for one_input in transform_node.inputs: - input_step = self._cache.get_pvalue(one_input) - inputs.append({ - '@type': 'OutputReference', - PropertyNames.STEP_NAME: input_step.proto.name, - PropertyNames.OUTPUT_NAME: input_step.get_output(one_input.tag) - }) - step.add_property(PropertyNames.INPUTS, inputs) - step.encoding = self._get_encoded_output_coder(transform_node) - step.add_property( - PropertyNames.OUTPUT_INFO, - [{ - PropertyNames.USER_NAME: ( - '%s.%s' % (transform_node.full_label, PropertyNames.OUT)), - PropertyNames.ENCODING: step.encoding, - PropertyNames.OUTPUT_NAME: PropertyNames.OUT - }]) - # TODO(srohde): Remove this after internal usages have been removed. def apply_GroupByKey(self, transform, pcoll, options): return transform.expand(pcoll) @@ -784,512 +510,6 @@ def _verify_gbk_coders(self, transform, pcoll): coders.registry.verify_deterministic( coder.key_coder(), 'GroupByKey operation "%s"' % transform.label) - def run_GroupByKey(self, transform_node, options): - input_tag = transform_node.inputs[0].tag - input_step = self._cache.get_pvalue(transform_node.inputs[0]) - - # Verify that the GBK's parent has a KV coder. - self._verify_gbk_coders(transform_node.transform, transform_node.inputs[0]) - - step = self._add_step( - TransformNames.GROUP, transform_node.full_label, transform_node) - step.add_property( - PropertyNames.PARALLEL_INPUT, - { - '@type': 'OutputReference', - PropertyNames.STEP_NAME: input_step.proto.name, - PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag) - }) - step.encoding = self._get_encoded_output_coder(transform_node) - step.add_property( - PropertyNames.OUTPUT_INFO, - [{ - PropertyNames.USER_NAME: ( - '%s.%s' % (transform_node.full_label, PropertyNames.OUT)), - PropertyNames.ENCODING: step.encoding, - PropertyNames.OUTPUT_NAME: PropertyNames.OUT - }]) - windowing = transform_node.transform.get_windowing(transform_node.inputs) - step.add_property( - PropertyNames.SERIALIZED_FN, - self.serialize_windowing_strategy(windowing, self._default_environment)) - - def run_ExternalTransform(self, transform_node, options): - # Adds a dummy step to the Dataflow job description so that inputs and - # outputs are mapped correctly in the presence of external transforms. - # - # Note that Dataflow Python multi-language pipelines use Portable Job - # Submission by default, hence this step and rest of the Dataflow step - # definitions defined here are not used at Dataflow service but we have to - # maintain the mapping correctly till we can fully drop the Dataflow step - # definitions from the SDK. - - # AppliedTransform node outputs have to be updated to correctly map the - # outputs for external transforms. - transform_node.outputs = ({ - output.tag: output - for output in transform_node.outputs.values() - }) - - self.run_Impulse(transform_node, options) - - def run_ParDo(self, transform_node, options): - transform = transform_node.transform - input_tag = transform_node.inputs[0].tag - input_step = self._cache.get_pvalue(transform_node.inputs[0]) - - # Attach side inputs. - si_dict = {} - si_labels = {} - full_label_counts = defaultdict(int) - lookup_label = lambda side_pval: si_labels[side_pval] - named_inputs = transform_node.named_inputs() - label_renames = {} - for ix, side_pval in enumerate(transform_node.side_inputs): - assert isinstance(side_pval, AsSideInput) - step_name = 'SideInput-' + self._get_unique_step_name() - si_label = ((SIDE_INPUT_PREFIX + '%d-%s') % - (ix, transform_node.full_label)) - old_label = (SIDE_INPUT_PREFIX + '%d') % ix - - label_renames[old_label] = si_label - - assert old_label in named_inputs - pcollection_label = '%s.%s' % ( - side_pval.pvalue.producer.full_label.split('/')[-1], - side_pval.pvalue.tag if side_pval.pvalue.tag else 'out') - si_full_label = '%s/%s(%s.%s)' % ( - transform_node.full_label, - side_pval.__class__.__name__, - pcollection_label, - full_label_counts[pcollection_label]) - - # Count the number of times the same PCollection is a side input - # to the same ParDo. - full_label_counts[pcollection_label] += 1 - - self._add_singleton_step( - step_name, - si_full_label, - side_pval.pvalue.tag, - self._cache.get_pvalue(side_pval.pvalue), - side_pval.pvalue.windowing, - side_pval._side_input_data().access_pattern) - si_dict[si_label] = { - '@type': 'OutputReference', - PropertyNames.STEP_NAME: step_name, - PropertyNames.OUTPUT_NAME: PropertyNames.OUT - } - si_labels[side_pval] = si_label - - # Now create the step for the ParDo transform being handled. - transform_name = transform_node.full_label.rsplit('/', 1)[-1] - step = self._add_step( - TransformNames.DO, - transform_node.full_label + - ('/{}'.format(transform_name) if transform_node.side_inputs else ''), - transform_node, - transform_node.transform.output_tags) - transform_proto = self.proto_context.transforms.get_proto(transform_node) - transform_id = self.proto_context.transforms.get_id(transform_node) - is_runner_v2 = _is_runner_v2(options) - # Patch side input ids to be unique across a given pipeline. - if (label_renames and - transform_proto.spec.urn == common_urns.primitives.PAR_DO.urn): - # Patch PTransform proto. - for old, new in label_renames.items(): - transform_proto.inputs[new] = transform_proto.inputs[old] - del transform_proto.inputs[old] - - # Patch ParDo proto. - proto_type, _ = beam.PTransform._known_urns[transform_proto.spec.urn] - proto = proto_utils.parse_Bytes(transform_proto.spec.payload, proto_type) - for old, new in label_renames.items(): - proto.side_inputs[new].CopyFrom(proto.side_inputs[old]) - del proto.side_inputs[old] - transform_proto.spec.payload = proto.SerializeToString() - # We need to update the pipeline proto. - del self.proto_pipeline.components.transforms[transform_id] - ( - self.proto_pipeline.components.transforms[transform_id].CopyFrom( - transform_proto)) - # The data transmitted in SERIALIZED_FN is different depending on whether - # this is a runner v2 pipeline or not. - if is_runner_v2: - serialized_data = transform_id - else: - serialized_data = pickler.dumps( - self._pardo_fn_data(transform_node, lookup_label)) - step.add_property(PropertyNames.SERIALIZED_FN, serialized_data) - # TODO(BEAM-8882): Enable once dataflow service doesn't reject this. - # step.add_property(PropertyNames.PIPELINE_PROTO_TRANSFORM_ID, transform_id) - step.add_property( - PropertyNames.PARALLEL_INPUT, - { - '@type': 'OutputReference', - PropertyNames.STEP_NAME: input_step.proto.name, - PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag) - }) - # Add side inputs if any. - step.add_property(PropertyNames.NON_PARALLEL_INPUTS, si_dict) - - # Generate description for the outputs. The output names - # will be 'None' for main output and '' for a tagged output. - outputs = [] - - all_output_tags = list(transform_proto.outputs.keys()) - - # Some external transforms require output tags to not be modified. - # So we randomly select one of the output tags as the main output and - # leave others as side outputs. Transform execution should not change - # dependending on which output tag we choose as the main output here. - # Also, some SDKs do not work correctly if output tags are modified. So for - # external transforms, we leave tags unmodified. - # - # Python SDK uses 'None' as the tag of the main output. - main_output_tag = 'None' - - step.encoding = self._get_encoded_output_coder( - transform_node, output_tag=main_output_tag) - - side_output_tags = set(all_output_tags).difference({main_output_tag}) - - # Add the main output to the description. - outputs.append({ - PropertyNames.USER_NAME: ( - '%s.%s' % (transform_node.full_label, PropertyNames.OUT)), - PropertyNames.ENCODING: step.encoding, - PropertyNames.OUTPUT_NAME: main_output_tag - }) - for side_tag in side_output_tags: - # The assumption here is that all outputs will have the same typehint - # and coder as the main output. This is certainly the case right now - # but conceivably it could change in the future. - encoding = self._get_encoded_output_coder( - transform_node, output_tag=side_tag) - outputs.append({ - PropertyNames.USER_NAME: ( - '%s.%s' % (transform_node.full_label, side_tag)), - PropertyNames.ENCODING: encoding, - PropertyNames.OUTPUT_NAME: side_tag - }) - - step.add_property(PropertyNames.OUTPUT_INFO, outputs) - - # Add the restriction encoding if we are a splittable DoFn - restriction_coder = transform.get_restriction_coder() - if restriction_coder: - step.add_property( - PropertyNames.RESTRICTION_ENCODING, - self._get_cloud_encoding(restriction_coder)) - - if options.view_as(StandardOptions).streaming: - is_stateful_dofn = (DoFnSignature(transform.dofn).is_stateful_dofn()) - if is_stateful_dofn: - step.add_property(PropertyNames.USES_KEYED_STATE, 'true') - - # Also checks whether the step allows shardable keyed states. - # TODO(BEAM-11360): remove this when migrated to portable job - # submission since we only consider supporting the property in runner - # v2. - for pcoll in transform_node.outputs.values(): - if pcoll._unique_name() in self.get_pcoll_with_auto_sharding(): - step.add_property(PropertyNames.ALLOWS_SHARDABLE_STATE, 'true') - # Currently we only allow auto-sharding to be enabled through the - # GroupIntoBatches transform. So we also add the following property - # which GroupIntoBatchesDoFn has, to allow the backend to perform - # graph optimization. - step.add_property(PropertyNames.PRESERVES_KEYS, 'true') - break - - @staticmethod - def _pardo_fn_data(transform_node, get_label): - transform = transform_node.transform - si_tags_and_types = [ # pylint: disable=protected-access - (get_label(side_pval), side_pval.__class__, side_pval._view_options()) - for side_pval in transform_node.side_inputs] - return ( - transform.fn, - transform.args, - transform.kwargs, - si_tags_and_types, - transform_node.inputs[0].windowing) - - def run_CombineValuesReplacement(self, transform_node, options): - transform = transform_node.transform.transform - input_tag = transform_node.inputs[0].tag - input_step = self._cache.get_pvalue(transform_node.inputs[0]) - step = self._add_step( - TransformNames.COMBINE, transform_node.full_label, transform_node) - transform_id = self.proto_context.transforms.get_id(transform_node.parent) - - # The data transmitted in SERIALIZED_FN is different depending on whether - # this is a runner v2 pipeline or not. - if _is_runner_v2(options): - # Fnapi pipelines send the transform ID of the CombineValues transform's - # parent composite because Dataflow expects the ID of a CombinePerKey - # transform. - serialized_data = transform_id - else: - # Combiner functions do not take deferred side-inputs (i.e. PValues) and - # therefore the code to handle extra args/kwargs is simpler than for the - # DoFn's of the ParDo transform. In the last, empty argument is where - # side inputs information would go. - serialized_data = pickler.dumps( - (transform.fn, transform.args, transform.kwargs, ())) - step.add_property(PropertyNames.SERIALIZED_FN, serialized_data) - # TODO(BEAM-8882): Enable once dataflow service doesn't reject this. - # step.add_property(PropertyNames.PIPELINE_PROTO_TRANSFORM_ID, transform_id) - step.add_property( - PropertyNames.PARALLEL_INPUT, - { - '@type': 'OutputReference', - PropertyNames.STEP_NAME: input_step.proto.name, - PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag) - }) - # Note that the accumulator must not have a WindowedValue encoding, while - # the output of this step does in fact have a WindowedValue encoding. - accumulator_encoding = self._get_cloud_encoding( - transform.fn.get_accumulator_coder()) - output_encoding = self._get_encoded_output_coder(transform_node) - - step.encoding = output_encoding - step.add_property(PropertyNames.ENCODING, accumulator_encoding) - # Generate description for main output 'out.' - outputs = [] - # Add the main output to the description. - outputs.append({ - PropertyNames.USER_NAME: ( - '%s.%s' % (transform_node.full_label, PropertyNames.OUT)), - PropertyNames.ENCODING: step.encoding, - PropertyNames.OUTPUT_NAME: PropertyNames.OUT - }) - step.add_property(PropertyNames.OUTPUT_INFO, outputs) - - def run_Read(self, transform_node, options): - transform = transform_node.transform - step = self._add_step( - TransformNames.READ, transform_node.full_label, transform_node) - # TODO(mairbek): refactor if-else tree to use registerable functions. - # Initialize the source specific properties. - - standard_options = options.view_as(StandardOptions) - if not hasattr(transform.source, 'format'): - # If a format is not set, we assume the source to be a custom source. - source_dict = {} - - source_dict['spec'] = { - '@type': names.SOURCE_TYPE, - names.SERIALIZED_SOURCE_KEY: pickler.dumps(transform.source) - } - - try: - source_dict['metadata'] = { - 'estimated_size_bytes': json_value.get_typed_value_descriptor( - transform.source.estimate_size()) - } - except error.RuntimeValueProviderError: - # Size estimation is best effort, and this error is by value provider. - _LOGGER.info( - 'Could not estimate size of source %r due to ' + \ - 'RuntimeValueProviderError', transform.source) - except Exception: # pylint: disable=broad-except - # Size estimation is best effort. So we log the error and continue. - _LOGGER.info( - 'Could not estimate size of source %r due to an exception: %s', - transform.source, - traceback.format_exc()) - - step.add_property(PropertyNames.SOURCE_STEP_INPUT, source_dict) - elif transform.source.format == 'pubsub': - if not standard_options.streaming: - raise ValueError( - 'Cloud Pub/Sub is currently available for use ' - 'only in streaming pipelines.') - # Only one of topic or subscription should be set. - if transform.source.full_subscription: - step.add_property( - PropertyNames.PUBSUB_SUBSCRIPTION, - transform.source.full_subscription) - elif transform.source.full_topic: - step.add_property( - PropertyNames.PUBSUB_TOPIC, transform.source.full_topic) - if transform.source.id_label: - step.add_property( - PropertyNames.PUBSUB_ID_LABEL, transform.source.id_label) - if transform.source.with_attributes: - # Setting this property signals Dataflow runner to return full - # PubsubMessages instead of just the data part of the payload. - step.add_property(PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, '') - - if transform.source.timestamp_attribute is not None: - step.add_property( - PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, - transform.source.timestamp_attribute) - else: - raise ValueError( - 'Source %r has unexpected format %s.' % - (transform.source, transform.source.format)) - - if not hasattr(transform.source, 'format'): - step.add_property(PropertyNames.FORMAT, names.SOURCE_FORMAT) - else: - step.add_property(PropertyNames.FORMAT, transform.source.format) - - # Wrap coder in WindowedValueCoder: this is necessary as the encoding of a - # step should be the type of value outputted by each step. Read steps - # automatically wrap output values in a WindowedValue wrapper, if necessary. - # This is also necessary for proper encoding for size estimation. - # Using a GlobalWindowCoder as a place holder instead of the default - # PickleCoder because GlobalWindowCoder is known coder. - # TODO(robertwb): Query the collection for the windowfn to extract the - # correct coder. - coder = coders.WindowedValueCoder( - coders.registry.get_coder(transform_node.outputs[None].element_type), - coders.coders.GlobalWindowCoder()) - - step.encoding = self._get_cloud_encoding(coder) - step.add_property( - PropertyNames.OUTPUT_INFO, - [{ - PropertyNames.USER_NAME: ( - '%s.%s' % (transform_node.full_label, PropertyNames.OUT)), - PropertyNames.ENCODING: step.encoding, - PropertyNames.OUTPUT_NAME: PropertyNames.OUT - }]) - - def run__NativeWrite(self, transform_node, options): - transform = transform_node.transform - input_tag = transform_node.inputs[0].tag - input_step = self._cache.get_pvalue(transform_node.inputs[0]) - step = self._add_step( - TransformNames.WRITE, transform_node.full_label, transform_node) - # TODO(mairbek): refactor if-else tree to use registerable functions. - # Initialize the sink specific properties. - if transform.sink.format == 'pubsub': - standard_options = options.view_as(StandardOptions) - if not standard_options.streaming: - raise ValueError( - 'Cloud Pub/Sub is currently available for use ' - 'only in streaming pipelines.') - step.add_property(PropertyNames.PUBSUB_TOPIC, transform.sink.full_topic) - if transform.sink.id_label: - step.add_property( - PropertyNames.PUBSUB_ID_LABEL, transform.sink.id_label) - # Setting this property signals Dataflow runner that the PCollection - # contains PubsubMessage objects instead of just raw data. - step.add_property(PropertyNames.PUBSUB_SERIALIZED_ATTRIBUTES_FN, '') - if transform.sink.timestamp_attribute is not None: - step.add_property( - PropertyNames.PUBSUB_TIMESTAMP_ATTRIBUTE, - transform.sink.timestamp_attribute) - else: - raise ValueError( - 'Sink %r has unexpected format %s.' % - (transform.sink, transform.sink.format)) - step.add_property(PropertyNames.FORMAT, transform.sink.format) - - # Wrap coder in WindowedValueCoder: this is necessary for proper encoding - # for size estimation. Using a GlobalWindowCoder as a place holder instead - # of the default PickleCoder because GlobalWindowCoder is known coder. - # TODO(robertwb): Query the collection for the windowfn to extract the - # correct coder. - coder = coders.WindowedValueCoder( - transform.sink.coder, coders.coders.GlobalWindowCoder()) - step.encoding = self._get_cloud_encoding(coder) - step.add_property(PropertyNames.ENCODING, step.encoding) - step.add_property( - PropertyNames.PARALLEL_INPUT, - { - '@type': 'OutputReference', - PropertyNames.STEP_NAME: input_step.proto.name, - PropertyNames.OUTPUT_NAME: input_step.get_output(input_tag) - }) - - def run_TestStream(self, transform_node, options): - from apache_beam.testing.test_stream import ElementEvent - from apache_beam.testing.test_stream import ProcessingTimeEvent - from apache_beam.testing.test_stream import WatermarkEvent - standard_options = options.view_as(StandardOptions) - if not standard_options.streaming: - raise ValueError( - 'TestStream is currently available for use ' - 'only in streaming pipelines.') - - transform = transform_node.transform - step = self._add_step( - TransformNames.READ, transform_node.full_label, transform_node) - step.add_property( - PropertyNames.SERIALIZED_FN, - self.proto_context.transforms.get_id(transform_node)) - step.add_property(PropertyNames.FORMAT, 'test_stream') - test_stream_payload = beam_runner_api_pb2.TestStreamPayload() - # TestStream source doesn't do any decoding of elements, - # so we won't set test_stream_payload.coder_id. - output_coder = transform._infer_output_coder() # pylint: disable=protected-access - for event in transform._events: - new_event = test_stream_payload.events.add() - if isinstance(event, ElementEvent): - for tv in event.timestamped_values: - element = new_event.element_event.elements.add() - element.encoded_element = output_coder.encode(tv.value) - element.timestamp = tv.timestamp.micros - elif isinstance(event, ProcessingTimeEvent): - new_event.processing_time_event.advance_duration = ( - event.advance_by.micros) - elif isinstance(event, WatermarkEvent): - new_event.watermark_event.new_watermark = event.new_watermark.micros - serialized_payload = self.byte_array_to_json_string( - test_stream_payload.SerializeToString()) - step.add_property(PropertyNames.SERIALIZED_TEST_STREAM, serialized_payload) - - step.encoding = self._get_encoded_output_coder(transform_node) - step.add_property( - PropertyNames.OUTPUT_INFO, - [{ - PropertyNames.USER_NAME: ( - '%s.%s' % (transform_node.full_label, PropertyNames.OUT)), - PropertyNames.ENCODING: step.encoding, - PropertyNames.OUTPUT_NAME: PropertyNames.OUT - }]) - - # We must mark this method as not a test or else its name is a matcher for - # nosetest tests. - run_TestStream.__test__ = False # type: ignore[attr-defined] - - @classmethod - def serialize_windowing_strategy(cls, windowing, default_environment): - from apache_beam.runners import pipeline_context - context = pipeline_context.PipelineContext( - default_environment=default_environment) - windowing_proto = windowing.to_runner_api(context) - return cls.byte_array_to_json_string( - beam_runner_api_pb2.MessageWithComponents( - components=context.to_runner_api(), - windowing_strategy=windowing_proto).SerializeToString()) - - @classmethod - def deserialize_windowing_strategy(cls, serialized_data): - # Imported here to avoid circular dependencies. - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.runners import pipeline_context - from apache_beam.transforms.core import Windowing - proto = beam_runner_api_pb2.MessageWithComponents() - proto.ParseFromString(cls.json_string_to_byte_array(serialized_data)) - return Windowing.from_runner_api( - proto.windowing_strategy, - pipeline_context.PipelineContext(proto.components)) - - @staticmethod - def byte_array_to_json_string(raw_bytes): - """Implements org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString.""" - return quote(raw_bytes) - - @staticmethod - def json_string_to_byte_array(encoded_string): - """Implements org.apache.beam.sdk.util.StringUtils.jsonStringToByteArray.""" - return unquote_to_bytes(encoded_string) - def get_default_gcp_region(self): """Get a default value for Google Cloud region according to https://cloud.google.com/compute/docs/gcloud-compute/#default-properties. @@ -1348,6 +568,8 @@ def _check_and_add_missing_options(options): options.view_as( GoogleCloudOptions).dataflow_service_options = dataflow_service_options + _add_runner_v2_missing_options(options) + # Ensure that prime is specified as an experiment if specified as a dataflow # service option if 'enable_prime' in dataflow_service_options: @@ -1359,11 +581,6 @@ def _check_and_add_missing_options(options): # Runner v2 only supports using streaming engine (aka windmill service) if options.view_as(StandardOptions).streaming: google_cloud_options = options.view_as(GoogleCloudOptions) - if _is_runner_v2_disabled(options): - raise ValueError( - 'Disabling Runner V2 no longer supported for streaming pipeline ' - 'using Beam Python %s.' % beam.version.__version__) - if (not google_cloud_options.enable_streaming_engine and (debug_options.lookup_experiment("enable_windmill_service") or debug_options.lookup_experiment("enable_streaming_engine"))): @@ -1380,29 +597,6 @@ def _check_and_add_missing_options(options): google_cloud_options.enable_streaming_engine = True debug_options.add_experiment("enable_streaming_engine") debug_options.add_experiment("enable_windmill_service") - _add_runner_v2_missing_options(debug_options) - elif (debug_options.lookup_experiment('enable_prime') or - debug_options.lookup_experiment('beam_fn_api') or - debug_options.lookup_experiment('use_unified_worker') or - debug_options.lookup_experiment('use_runner_v2') or - debug_options.lookup_experiment('use_portable_job_submission')): - if _is_runner_v2_disabled(options): - raise ValueError( - """Runner V2 both disabled and enabled: at least one of - ['enable_prime', 'beam_fn_api', 'use_unified_worker', 'use_runner_v2', - 'use_portable_job_submission'] is set and also one of - ['disable_runner_v2', 'disable_runner_v2_until_2023', - 'disable_prime_runner_v2'] is set.""") - _add_runner_v2_missing_options(debug_options) - - -def _is_runner_v2(options): - # Type: (PipelineOptions) -> bool - - """Returns true if runner v2 is enabled.""" - _check_and_add_missing_options(options) - return options.view_as(DebugOptions).lookup_experiment( - 'use_runner_v2', default=False) def _is_runner_v2_disabled(options): diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index b00644d13fd7..1e084b98278d 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -19,19 +19,13 @@ # pytype: skip-file -import json import unittest -from datetime import datetime -from itertools import product import mock -from parameterized import param -from parameterized import parameterized import apache_beam as beam import apache_beam.transforms as ptransform from apache_beam.options.pipeline_options import DebugOptions -from apache_beam.options.pipeline_options import GoogleCloudOptions from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.pipeline import AppliedPTransform from apache_beam.pipeline import Pipeline @@ -45,18 +39,13 @@ from apache_beam.runners import create_runner from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeException -from apache_beam.runners.dataflow.dataflow_runner import PropertyNames -from apache_beam.runners.dataflow.dataflow_runner import _is_runner_v2 -from apache_beam.runners.dataflow.dataflow_runner import _is_runner_v2_disabled +from apache_beam.runners.dataflow.dataflow_runner import _check_and_add_missing_options from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api from apache_beam.runners.runner import PipelineState from apache_beam.testing.extra_assertions import ExtraAssertionsMixin from apache_beam.testing.test_pipeline import TestPipeline from apache_beam.transforms import combiners from apache_beam.transforms import environments -from apache_beam.transforms import window -from apache_beam.transforms.core import Windowing -from apache_beam.transforms.display import DisplayDataItem from apache_beam.typehints import typehints # Protect against environments where apitools library is not available. @@ -262,49 +251,6 @@ def test_remote_runner_translation(self): | 'Do' >> ptransform.FlatMap(lambda x: [(x, x)]) | ptransform.GroupByKey()) - def test_remote_runner_display_data(self): - remote_runner = DataflowRunner() - p = Pipeline( - remote_runner, options=PipelineOptions(self.default_properties)) - - now = datetime.now() - # pylint: disable=expression-not-assigned - ( - p | ptransform.Create([1, 2, 3, 4, 5]) - | 'Do' >> SpecialParDo(SpecialDoFn(), now)) - - # TODO(https://github.com/apache/beam/issues/18012) Enable runner API on - # this test. - p.run(test_runner_api=False) - job_dict = json.loads(str(remote_runner.job)) - steps = [ - step for step in job_dict['steps'] - if len(step['properties'].get('display_data', [])) > 0 - ] - step = steps[1] - disp_data = step['properties']['display_data'] - nspace = SpecialParDo.__module__ + '.' - expected_data = [{ - 'type': 'TIMESTAMP', - 'namespace': nspace + 'SpecialParDo', - 'value': DisplayDataItem._format_value(now, 'TIMESTAMP'), - 'key': 'a_time' - }, - { - 'type': 'STRING', - 'namespace': nspace + 'SpecialParDo', - 'value': nspace + 'SpecialParDo', - 'key': 'a_class', - 'shortValue': 'SpecialParDo' - }, - { - 'type': 'INTEGER', - 'namespace': nspace + 'SpecialDoFn', - 'value': 42, - 'key': 'dofn_value' - }] - self.assertUnhashableCountEqual(disp_data, expected_data) - def test_group_by_key_input_visitor_with_valid_inputs(self): p = TestPipeline() pcoll1 = PCollection(p) @@ -391,15 +337,6 @@ def test_gbk_then_flatten_input_visitor(self): self.assertEqual(flat.element_type, none_str_pc.element_type) self.assertEqual(flat.element_type, none_int_pc.element_type) - def test_serialize_windowing_strategy(self): - # This just tests the basic path; more complete tests - # are in window_test.py. - strategy = Windowing(window.FixedWindows(10)) - self.assertEqual( - strategy, - DataflowRunner.deserialize_windowing_strategy( - DataflowRunner.serialize_windowing_strategy(strategy, None))) - def test_side_input_visitor(self): p = TestPipeline() pc = p | beam.Create([]) @@ -411,8 +348,7 @@ def test_side_input_visitor(self): beam.pvalue.AsSingleton(pc), beam.pvalue.AsMultiMap(pc)) applied_transform = AppliedPTransform(None, transform, "label", {'pc': pc}) - DataflowRunner.side_input_visitor( - is_runner_v2=True).visit_transform(applied_transform) + DataflowRunner.side_input_visitor().visit_transform(applied_transform) self.assertEqual(2, len(applied_transform.side_inputs)) self.assertEqual( common_urns.side_inputs.ITERABLE.urn, @@ -504,116 +440,6 @@ def test_get_default_gcp_region_ignores_error( result = runner.get_default_gcp_region() self.assertIsNone(result) - def test_combine_values_translation(self): - runner = DataflowRunner() - - with beam.Pipeline(runner=runner, - options=PipelineOptions(self.default_properties)) as p: - ( # pylint: disable=expression-not-assigned - p - | beam.Create([('a', [1, 2]), ('b', [3, 4])]) - | beam.CombineValues(lambda v, _: sum(v))) - - job_dict = json.loads(str(runner.job)) - self.assertIn( - 'CombineValues', set(step['kind'] for step in job_dict['steps'])) - - def _find_step(self, job, step_name): - job_dict = json.loads(str(job)) - maybe_step = [ - s for s in job_dict['steps'] - if s['properties']['user_name'] == step_name - ] - self.assertTrue(maybe_step, 'Could not find step {}'.format(step_name)) - return maybe_step[0] - - def expect_correct_override(self, job, step_name, step_kind): - """Expects that a transform was correctly overriden.""" - - # If the typing information isn't being forwarded correctly, the component - # encodings here will be incorrect. - expected_output_info = [{ - "encoding": { - "@type": "kind:windowed_value", - "component_encodings": [{ - "@type": "kind:bytes" - }, { - "@type": "kind:global_window" - }], - "is_wrapper": True - }, - "output_name": "out", - "user_name": step_name + ".out" - }] - - step = self._find_step(job, step_name) - self.assertEqual(step['kind'], step_kind) - - # The display data here is forwarded because the replace transform is - # subclassed from iobase.Read. - self.assertGreater(len(step['properties']['display_data']), 0) - self.assertEqual(step['properties']['output_info'], expected_output_info) - - def test_read_create_translation(self): - runner = DataflowRunner() - - with beam.Pipeline(runner=runner, - options=PipelineOptions(self.default_properties)) as p: - # pylint: disable=expression-not-assigned - p | beam.Create([b'a', b'b', b'c']) - - self.expect_correct_override(runner.job, 'Create/Read', 'ParallelRead') - - def test_read_pubsub_translation(self): - runner = DataflowRunner() - - self.default_properties.append("--streaming") - - with beam.Pipeline(runner=runner, - options=PipelineOptions(self.default_properties)) as p: - # pylint: disable=expression-not-assigned - p | beam.io.ReadFromPubSub(topic='projects/project/topics/topic') - - self.expect_correct_override( - runner.job, 'ReadFromPubSub/Read', 'ParallelRead') - - def test_gbk_translation(self): - runner = DataflowRunner() - with beam.Pipeline(runner=runner, - options=PipelineOptions(self.default_properties)) as p: - # pylint: disable=expression-not-assigned - p | beam.Create([(1, 2)]) | beam.GroupByKey() - - expected_output_info = [{ - "encoding": { - "@type": "kind:windowed_value", - "component_encodings": [{ - "@type": "kind:pair", - "component_encodings": [{ - "@type": "kind:varint" - }, - { - "@type": "kind:stream", - "component_encodings": [{ - "@type": "kind:varint" - }], - "is_stream_like": True - }], - "is_pair_like": True - }, { - "@type": "kind:global_window" - }], - "is_wrapper": True - }, - "output_name": "out", - "user_name": "GroupByKey.out" - }] # yapf: disable - - gbk_step = self._find_step(runner.job, 'GroupByKey') - self.assertEqual(gbk_step['kind'], 'GroupByKey') - self.assertEqual( - gbk_step['properties']['output_info'], expected_output_info) - @unittest.skip( 'https://github.com/apache/beam/issues/18716: enable once ' 'CombineFnVisitor is fixed') @@ -646,43 +472,6 @@ def teardown(self, *args, **kwargs): except ValueError: self.fail('ValueError raised unexpectedly') - def _run_group_into_batches_and_get_step_properties( - self, with_sharded_key, additional_properties): - self.default_properties.append('--streaming') - for property in additional_properties: - self.default_properties.append(property) - - runner = DataflowRunner() - with beam.Pipeline(runner=runner, - options=PipelineOptions(self.default_properties)) as p: - # pylint: disable=expression-not-assigned - input = p | beam.Create([('a', 1), ('a', 1), ('b', 3), ('b', 4)]) - if with_sharded_key: - ( - input | beam.GroupIntoBatches.WithShardedKey(2) - | beam.Map(lambda key_values: (key_values[0].key, key_values[1]))) - step_name = ( - 'WithShardedKey/GroupIntoBatches/ParDo(_GroupIntoBatchesDoFn)') - else: - input | beam.GroupIntoBatches(2) - step_name = 'GroupIntoBatches/ParDo(_GroupIntoBatchesDoFn)' - - return self._find_step(runner.job, step_name)['properties'] - - def test_group_into_batches_translation(self): - properties = self._run_group_into_batches_and_get_step_properties( - True, ['--enable_streaming_engine', '--experiments=use_runner_v2']) - self.assertEqual(properties[PropertyNames.USES_KEYED_STATE], 'true') - self.assertEqual(properties[PropertyNames.ALLOWS_SHARDABLE_STATE], 'true') - self.assertEqual(properties[PropertyNames.PRESERVES_KEYS], 'true') - - def test_group_into_batches_translation_non_sharded(self): - properties = self._run_group_into_batches_and_get_step_properties( - False, ['--enable_streaming_engine', '--experiments=use_runner_v2']) - self.assertEqual(properties[PropertyNames.USES_KEYED_STATE], 'true') - self.assertNotIn(PropertyNames.ALLOWS_SHARDABLE_STATE, properties) - self.assertNotIn(PropertyNames.PRESERVES_KEYS, properties) - def test_pack_combiners(self): class PackableCombines(beam.PTransform): def annotations(self): @@ -711,141 +500,44 @@ def expand(self, pcoll): self.assertNotIn(unpacked_maximum_step_name, transform_names) self.assertIn(packed_step_name, transform_names) - @parameterized.expand([ - param(memory_hint='min_ram'), - param(memory_hint='minRam'), - ]) - def test_resource_hints_translation(self, memory_hint): - runner = DataflowRunner() - self.default_properties.append('--resource_hint=accelerator=some_gpu') - self.default_properties.append(f'--resource_hint={memory_hint}=20GB') - with beam.Pipeline(runner=runner, - options=PipelineOptions(self.default_properties)) as p: - # pylint: disable=expression-not-assigned - ( - p - | beam.Create([1]) - | 'MapWithHints' >> beam.Map(lambda x: x + 1).with_resource_hints( - min_ram='10GB', - accelerator='type:nvidia-tesla-k80;count:1;install-nvidia-drivers' - )) - - step = self._find_step(runner.job, 'MapWithHints') - self.assertEqual( - step['properties']['resource_hints'], - { - 'beam:resources:min_ram_bytes:v1': '20000000000', - 'beam:resources:accelerator:v1': \ - 'type%3Anvidia-tesla-k80%3Bcount%3A1%3Binstall-nvidia-drivers' - }) - - @parameterized.expand([ - ( - "%s_%s" % (enable_option, disable_option), - enable_option, - disable_option) - for (enable_option, - disable_option) in product([ - False, - 'enable_prime', - 'beam_fn_api', - 'use_unified_worker', - 'use_runner_v2', - 'use_portable_job_submission' - ], - [ - False, - 'disable_runner_v2', - 'disable_runner_v2_until_2023', - 'disable_prime_runner_v2' - ]) - ]) - def test_batch_is_runner_v2(self, name, enable_option, disable_option): - options = PipelineOptions( - (['--experiments=%s' % enable_option] if enable_option else []) + - (['--experiments=%s' % disable_option] if disable_option else [])) - if (enable_option and disable_option): - with self.assertRaisesRegex(ValueError, - 'Runner V2 both disabled and enabled'): - _is_runner_v2(options) - elif enable_option: - self.assertTrue(_is_runner_v2(options)) - self.assertFalse(_is_runner_v2_disabled(options)) - for expected in ['beam_fn_api', - 'use_unified_worker', - 'use_runner_v2', - 'use_portable_job_submission']: - self.assertTrue( - options.view_as(DebugOptions).lookup_experiment(expected, False)) - if enable_option == 'enable_prime': - self.assertIn( - 'enable_prime', - options.view_as(GoogleCloudOptions).dataflow_service_options) - elif disable_option: - self.assertFalse(_is_runner_v2(options)) - self.assertTrue(_is_runner_v2_disabled(options)) - else: - self.assertFalse(_is_runner_v2(options)) - - @parameterized.expand([ - ( - "%s_%s" % (enable_option, disable_option), - enable_option, - disable_option) - for (enable_option, - disable_option) in product([ - False, - 'enable_prime', - 'beam_fn_api', - 'use_unified_worker', - 'use_runner_v2', - 'use_portable_job_submission' - ], - [ - False, - 'disable_runner_v2', - 'disable_runner_v2_until_2023', - 'disable_prime_runner_v2' - ]) - ]) - def test_streaming_is_runner_v2(self, name, enable_option, disable_option): - options = PipelineOptions( - ['--streaming'] + - (['--experiments=%s' % enable_option] if enable_option else []) + - (['--experiments=%s' % disable_option] if disable_option else [])) - if disable_option: - with self.assertRaisesRegex( - ValueError, - 'Disabling Runner V2 no longer supported for streaming pipeline'): - _is_runner_v2(options) - else: - self.assertTrue(_is_runner_v2(options)) - for expected in ['beam_fn_api', - 'use_unified_worker', - 'use_runner_v2', - 'use_portable_job_submission', - 'enable_windmill_service', - 'enable_streaming_engine']: - self.assertTrue( - options.view_as(DebugOptions).lookup_experiment(expected, False)) - if enable_option == 'enable_prime': - self.assertIn( - 'enable_prime', - options.view_as(GoogleCloudOptions).dataflow_service_options) + def test_batch_is_runner_v2(self): + options = PipelineOptions() + _check_and_add_missing_options(options) + for expected in ['beam_fn_api', + 'use_unified_worker', + 'use_runner_v2', + 'use_portable_job_submission']: + self.assertTrue( + options.view_as(DebugOptions).lookup_experiment(expected, False), + expected) + + def test_streaming_is_runner_v2(self): + options = PipelineOptions(['--streaming']) + _check_and_add_missing_options(options) + for expected in ['beam_fn_api', + 'use_unified_worker', + 'use_runner_v2', + 'use_portable_job_submission', + 'enable_windmill_service', + 'enable_streaming_engine']: + self.assertTrue( + options.view_as(DebugOptions).lookup_experiment(expected, False), + expected) def test_dataflow_service_options_enable_prime_sets_runner_v2(self): options = PipelineOptions(['--dataflow_service_options=enable_prime']) - self.assertTrue(_is_runner_v2(options)) + _check_and_add_missing_options(options) for expected in ['beam_fn_api', 'use_unified_worker', 'use_runner_v2', 'use_portable_job_submission']: self.assertTrue( - options.view_as(DebugOptions).lookup_experiment(expected, False)) + options.view_as(DebugOptions).lookup_experiment(expected, False), + expected) options = PipelineOptions( ['--streaming', '--dataflow_service_options=enable_prime']) - self.assertTrue(_is_runner_v2(options)) + _check_and_add_missing_options(options) for expected in ['beam_fn_api', 'use_unified_worker', 'use_runner_v2', @@ -853,7 +545,8 @@ def test_dataflow_service_options_enable_prime_sets_runner_v2(self): 'enable_windmill_service', 'enable_streaming_engine']: self.assertTrue( - options.view_as(DebugOptions).lookup_experiment(expected, False)) + options.view_as(DebugOptions).lookup_experiment(expected, False), + expected) if __name__ == '__main__': diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py index bffcc6d66349..ff1beeab510d 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py @@ -65,7 +65,6 @@ from apache_beam.runners.common import validate_pipeline_graph from apache_beam.runners.dataflow.internal import names from apache_beam.runners.dataflow.internal.clients import dataflow -from apache_beam.runners.dataflow.internal.names import PropertyNames from apache_beam.runners.internal import names as shared_names from apache_beam.runners.portability.stager import Stager from apache_beam.transforms import DataflowDistributionCounter @@ -86,63 +85,6 @@ _PYTHON_VERSIONS_SUPPORTED_BY_DATAFLOW = ['3.8', '3.9', '3.10', '3.11'] -class Step(object): - """Wrapper for a dataflow Step protobuf.""" - def __init__(self, step_kind, step_name, additional_properties=None): - self.step_kind = step_kind - self.step_name = step_name - self.proto = dataflow.Step(kind=step_kind, name=step_name) - self.proto.properties = {} - self._additional_properties = [] - - if additional_properties is not None: - for (n, v, t) in additional_properties: - self.add_property(n, v, t) - - def add_property(self, name, value, with_type=False): - self._additional_properties.append((name, value, with_type)) - self.proto.properties.additionalProperties.append( - dataflow.Step.PropertiesValue.AdditionalProperty( - key=name, value=to_json_value(value, with_type=with_type))) - - def _get_outputs(self): - """Returns a list of all output labels for a step.""" - outputs = [] - for p in self.proto.properties.additionalProperties: - if p.key == PropertyNames.OUTPUT_INFO: - for entry in p.value.array_value.entries: - for entry_prop in entry.object_value.properties: - if entry_prop.key == PropertyNames.OUTPUT_NAME: - outputs.append(entry_prop.value.string_value) - return outputs - - def __reduce__(self): - """Reduce hook for pickling the Step class more easily.""" - return (Step, (self.step_kind, self.step_name, self._additional_properties)) - - def get_output(self, tag=None): - """Returns name if it is one of the outputs or first output if name is None. - - Args: - tag: tag of the output as a string or None if we want to get the - name of the first output. - - Returns: - The name of the output associated with the tag or the first output - if tag was None. - - Raises: - ValueError: if the tag does not exist within outputs. - """ - outputs = self._get_outputs() - if tag is None or len(outputs) == 1: - return outputs[0] - else: - if tag not in outputs: - raise ValueError('Cannot find named output: %s in %s.' % (tag, outputs)) - return tag - - class Environment(object): """Wrapper for a dataflow Environment protobuf.""" def __init__( @@ -152,7 +94,6 @@ def __init__( environment_version, proto_pipeline_staged_url, proto_pipeline=None): - from apache_beam.runners.dataflow.dataflow_runner import _is_runner_v2 self.standard_options = options.view_as(StandardOptions) self.google_cloud_options = options.view_as(GoogleCloudOptions) self.worker_options = options.view_as(WorkerOptions) @@ -192,10 +133,7 @@ def __init__( if self.standard_options.streaming: job_type = 'FNAPI_STREAMING' else: - if _is_runner_v2(options): - job_type = 'FNAPI_BATCH' - else: - job_type = 'PYTHON_BATCH' + job_type = 'FNAPI_BATCH' self.proto.version.additionalProperties.extend([ dataflow.Environment.VersionValue.AdditionalProperty( key='job_type', value=to_json_value(job_type)), @@ -297,7 +235,7 @@ def __init__( container_image.capabilities.append(capability) pool.sdkHarnessContainerImages.append(container_image) - if not _is_runner_v2(options) or not pool.sdkHarnessContainerImages: + if not pool.sdkHarnessContainerImages: pool.workerHarnessContainerImage = ( get_container_image_from_options(options)) elif len(pool.sdkHarnessContainerImages) == 1: @@ -554,11 +492,7 @@ def __init__(self, options, root_staging_location=None): self._root_staging_location = ( root_staging_location or self.google_cloud_options.staging_location) - from apache_beam.runners.dataflow.dataflow_runner import _is_runner_v2 - if _is_runner_v2(options): - self.environment_version = _FNAPI_ENVIRONMENT_MAJOR_VERSION - else: - self.environment_version = _LEGACY_ENVIRONMENT_MAJOR_VERSION + self.environment_version = _FNAPI_ENVIRONMENT_MAJOR_VERSION if self.google_cloud_options.no_auth: credentials = None @@ -1202,46 +1136,31 @@ def get_container_image_from_options(pipeline_options): Returns: str: Container image for remote execution. """ - from apache_beam.runners.dataflow.dataflow_runner import _is_runner_v2_disabled worker_options = pipeline_options.view_as(WorkerOptions) if worker_options.sdk_container_image: return worker_options.sdk_container_image - is_runner_v2 = not _is_runner_v2_disabled(pipeline_options) - # Legacy and runner v2 exist in different repositories. # Set to legacy format, override if runner v2 container_repo = names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY - image_name = '{repository}/python{major}{minor}'.format( + image_name = '{repository}/beam_python{major}.{minor}_sdk'.format( repository=container_repo, major=sys.version_info[0], minor=sys.version_info[1]) - if is_runner_v2: - image_name = '{repository}/beam_python{major}.{minor}_sdk'.format( - repository=container_repo, - major=sys.version_info[0], - minor=sys.version_info[1]) - - image_tag = _get_required_container_version(is_runner_v2) + image_tag = _get_required_container_version() return image_name + ':' + image_tag -def _get_required_container_version(is_runner_v2): +def _get_required_container_version(): """For internal use only; no backwards-compatibility guarantees. - Args: - is_runner_v2 (bool): True if and only if pipeline is using runner v2. - Returns: str: The tag of worker container images in GCR that corresponds to current version of the SDK. """ if 'dev' in beam_version.__version__: - if is_runner_v2: - return names.BEAM_FNAPI_CONTAINER_VERSION - else: - return names.BEAM_CONTAINER_VERSION + return names.BEAM_FNAPI_CONTAINER_VERSION else: return _get_container_image_tag() diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py index 89ac58a727bf..22e779a8c274 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py @@ -653,25 +653,6 @@ def test_pinned_worker_harness_image_tag_used_in_dev_sdk(self): sys.version_info[1], names.BEAM_FNAPI_CONTAINER_VERSION))) - # batch, legacy pipeline. - pipeline_options = pipeline_options = PipelineOptions([ - '--temp_location', - 'gs://any-location/temp', - '--experiments=disable_runner_v2_until_v2.50' - ]) - env = apiclient.Environment( - [], #packages - pipeline_options, - '2.0.0', #any environment version - FAKE_PIPELINE_URL) - self.assertEqual( - env.proto.workerPools[0].workerHarnessContainerImage, - ( - names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/python%d%d:%s' % ( - sys.version_info[0], - sys.version_info[1], - names.BEAM_CONTAINER_VERSION))) - @mock.patch( 'apache_beam.runners.dataflow.internal.apiclient.' 'beam_version.__version__', @@ -706,23 +687,6 @@ def test_worker_harness_image_tag_matches_released_sdk_version(self): '/beam_python%d.%d_sdk:2.2.0' % (sys.version_info[0], sys.version_info[1]))) - # batch, legacy pipeline. - pipeline_options = pipeline_options = PipelineOptions([ - '--temp_location', - 'gs://any-location/temp', - '--experiments=disable_runner_v2_until_v2.50' - ]) - env = apiclient.Environment( - [], #packages - pipeline_options, - '2.0.0', #any environment version - FAKE_PIPELINE_URL) - self.assertEqual( - env.proto.workerPools[0].workerHarnessContainerImage, - ( - names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/python%d%d:2.2.0' % - (sys.version_info[0], sys.version_info[1]))) - @mock.patch( 'apache_beam.runners.dataflow.internal.apiclient.' 'beam_version.__version__', @@ -757,23 +721,6 @@ def test_worker_harness_image_tag_matches_base_sdk_version_of_an_rc(self): '/beam_python%d.%d_sdk:2.2.0' % (sys.version_info[0], sys.version_info[1]))) - # batch, legacy pipeline - pipeline_options = pipeline_options = PipelineOptions([ - '--temp_location', - 'gs://any-location/temp', - '--experiments=disable_runner_v2_until_v2.50' - ]) - env = apiclient.Environment( - [], #packages - pipeline_options, - '2.0.0', #any environment version - FAKE_PIPELINE_URL) - self.assertEqual( - env.proto.workerPools[0].workerHarnessContainerImage, - ( - names.DATAFLOW_CONTAINER_IMAGE_REPOSITORY + '/python%d%d:2.2.0' % - (sys.version_info[0], sys.version_info[1]))) - def test_worker_harness_override_takes_precedence_over_sdk_defaults(self): # streaming, fnapi pipeline. pipeline_options = PipelineOptions([ diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py index c0444037de8c..f86306eb276e 100644 --- a/sdks/python/apache_beam/runners/dataflow/internal/names.py +++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py @@ -42,79 +42,3 @@ BEAM_FNAPI_CONTAINER_VERSION = 'beam-master-20230705' DATAFLOW_CONTAINER_IMAGE_REPOSITORY = 'gcr.io/cloud-dataflow/v1beta3' - - -class TransformNames(object): - """For internal use only; no backwards-compatibility guarantees. - - Transform strings as they are expected in the CloudWorkflow protos. - """ - COLLECTION_TO_SINGLETON = 'CollectionToSingleton' - COMBINE = 'CombineValues' - CREATE_PCOLLECTION = 'CreateCollection' - DO = 'ParallelDo' - FLATTEN = 'Flatten' - GROUP = 'GroupByKey' - READ = 'ParallelRead' - WRITE = 'ParallelWrite' - - -class PropertyNames(object): - """For internal use only; no backwards-compatibility guarantees. - - Property strings as they are expected in the CloudWorkflow protos. - """ - # If uses_keyed_state, whether the state can be sharded. - ALLOWS_SHARDABLE_STATE = 'allows_shardable_state' - BIGQUERY_CREATE_DISPOSITION = 'create_disposition' - BIGQUERY_DATASET = 'dataset' - BIGQUERY_EXPORT_FORMAT = 'bigquery_export_format' - BIGQUERY_FLATTEN_RESULTS = 'bigquery_flatten_results' - BIGQUERY_KMS_KEY = 'bigquery_kms_key' - BIGQUERY_PROJECT = 'project' - BIGQUERY_QUERY = 'bigquery_query' - BIGQUERY_SCHEMA = 'schema' - BIGQUERY_TABLE = 'table' - BIGQUERY_USE_LEGACY_SQL = 'bigquery_use_legacy_sql' - BIGQUERY_WRITE_DISPOSITION = 'write_disposition' - DISPLAY_DATA = 'display_data' - ELEMENT = 'element' - ELEMENTS = 'elements' - ENCODING = 'encoding' - FILE_PATTERN = 'filepattern' - FILE_NAME_PREFIX = 'filename_prefix' - FILE_NAME_SUFFIX = 'filename_suffix' - FORMAT = 'format' - INPUTS = 'inputs' - IMPULSE_ELEMENT = 'impulse_element' - NON_PARALLEL_INPUTS = 'non_parallel_inputs' - NUM_SHARDS = 'num_shards' - OUT = 'out' - OUTPUT = 'output' - OUTPUT_INFO = 'output_info' - OUTPUT_NAME = 'output_name' - PARALLEL_INPUT = 'parallel_input' - PIPELINE_PROTO_TRANSFORM_ID = 'pipeline_proto_transform_id' - # If the input element is a key/value pair, then the output element(s) all - # have the same key as the input. - PRESERVES_KEYS = 'preserves_keys' - PUBSUB_ID_LABEL = 'pubsub_id_label' - PUBSUB_SERIALIZED_ATTRIBUTES_FN = 'pubsub_serialized_attributes_fn' - PUBSUB_SUBSCRIPTION = 'pubsub_subscription' - PUBSUB_TIMESTAMP_ATTRIBUTE = 'pubsub_timestamp_label' - PUBSUB_TOPIC = 'pubsub_topic' - RESOURCE_HINTS = 'resource_hints' - RESTRICTION_ENCODING = 'restriction_encoding' - SERIALIZED_FN = 'serialized_fn' - SHARD_NAME_TEMPLATE = 'shard_template' - SOURCE_STEP_INPUT = 'custom_source_step_input' - SERIALIZED_TEST_STREAM = 'serialized_test_stream' - STEP_NAME = 'step_name' - USE_INDEXED_FORMAT = 'use_indexed_format' - USER_FN = 'user_fn' - USER_NAME = 'user_name' - USES_KEYED_STATE = 'uses_keyed_state' - VALIDATE_SINK = 'validate_sink' - VALIDATE_SOURCE = 'validate_source' - VALUE = 'value' - WINDOWING_STRATEGY = 'windowing_strategy' diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py index 1012a7d36240..8004762f5eec 100644 --- a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py +++ b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py @@ -19,101 +19,9 @@ # pytype: skip-file -from apache_beam.options.pipeline_options import StandardOptions from apache_beam.pipeline import PTransformOverride -class CreatePTransformOverride(PTransformOverride): - """A ``PTransformOverride`` for ``Create`` in streaming mode.""" - def matches(self, applied_ptransform): - # Imported here to avoid circular dependencies. - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam import Create - return isinstance(applied_ptransform.transform, Create) - - def get_replacement_transform_for_applied_ptransform( - self, applied_ptransform): - # Imported here to avoid circular dependencies. - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam import PTransform - - ptransform = applied_ptransform.transform - - # Return a wrapper rather than ptransform.as_read() directly to - # ensure backwards compatibility of the pipeline structure. - class LegacyCreate(PTransform): - def expand(self, pbegin): - return pbegin | ptransform.as_read() - - return LegacyCreate().with_output_types(ptransform.get_output_type()) - - -class ReadPTransformOverride(PTransformOverride): - """A ``PTransformOverride`` for ``Read(BoundedSource)``""" - def matches(self, applied_ptransform): - from apache_beam.io import Read - from apache_beam.io.iobase import BoundedSource - # Only overrides Read(BoundedSource) transform - if (isinstance(applied_ptransform.transform, Read) and - not getattr(applied_ptransform.transform, 'override', False)): - if isinstance(applied_ptransform.transform.source, BoundedSource): - return True - return False - - def get_replacement_transform_for_applied_ptransform( - self, applied_ptransform): - - from apache_beam import pvalue - from apache_beam.io import iobase - - transform = applied_ptransform.transform - - class Read(iobase.Read): - override = True - - def expand(self, pbegin): - return pvalue.PCollection( - self.pipeline, is_bounded=self.source.is_bounded()) - - return Read(transform.source).with_output_types( - transform.get_type_hints().simple_output_type('Read')) - - -class CombineValuesPTransformOverride(PTransformOverride): - """A ``PTransformOverride`` for ``CombineValues``. - - The DataflowRunner expects that the CombineValues PTransform acts as a - primitive. So this override replaces the CombineValues with a primitive. - """ - def matches(self, applied_ptransform): - # Imported here to avoid circular dependencies. - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam import CombineValues - - if isinstance(applied_ptransform.transform, CombineValues): - self.transform = applied_ptransform.transform - return True - return False - - def get_replacement_transform(self, ptransform): - # Imported here to avoid circular dependencies. - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam import PTransform - from apache_beam.pvalue import PCollection - - # The DataflowRunner still needs access to the CombineValues members to - # generate a V1B3 proto representation, so we remember the transform from - # the matches method and forward it here. - class CombineValuesReplacement(PTransform): - def __init__(self, transform): - self.transform = transform - - def expand(self, pcoll): - return PCollection.from_(pcoll) - - return CombineValuesReplacement(self.transform) - - class NativeReadPTransformOverride(PTransformOverride): """A ``PTransformOverride`` for ``Read`` using native sources. @@ -150,37 +58,3 @@ def expand(self, pbegin): # will choose the incorrect coder for this transform. return Read(ptransform.source).with_output_types( ptransform.source.coder.to_type_hint()) - - -class GroupIntoBatchesWithShardedKeyPTransformOverride(PTransformOverride): - """A ``PTransformOverride`` for ``GroupIntoBatches.WithShardedKey``. - - This override simply returns the original transform but additionally records - the output PCollection in order to append required step properties during - graph translation. - """ - def __init__(self, dataflow_runner, options): - self.dataflow_runner = dataflow_runner - self.options = options - - def matches(self, applied_ptransform): - # Imported here to avoid circular dependencies. - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam import util - - transform = applied_ptransform.transform - - if not isinstance(transform, util.GroupIntoBatches.WithShardedKey): - return False - - # The replacement is only valid for portable Streaming Engine jobs with - # runner v2. - standard_options = self.options.view_as(StandardOptions) - if not standard_options.streaming: - return False - - self.dataflow_runner.add_pcoll_with_auto_sharding(applied_ptransform) - return True - - def get_replacement_transform_for_applied_ptransform(self, ptransform): - return ptransform.transform