From 48722f1bfb4b0e81d29c8ba595944a3828af877f Mon Sep 17 00:00:00 2001 From: Hai Joey Tran Date: Tue, 24 Oct 2023 19:54:16 -0400 Subject: [PATCH] [PYTHON] Add new `--auto_unique_labels` option to StandardOptions (#28984) --- .../apache_beam/options/pipeline_options.py | 8 +++++ sdks/python/apache_beam/pipeline.py | 33 +++++++++++++++---- sdks/python/apache_beam/pipeline_test.py | 27 +++++++++++++++ 3 files changed, 62 insertions(+), 6 deletions(-) diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 3fbf7eff7dd6..76b776779cf9 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -515,6 +515,14 @@ def _add_argparse_args(cls, parser): 'at transform level. Interpretation of hints is defined by ' 'Beam runners.')) + parser.add_argument( + '--auto_unique_labels', + default=False, + action='store_true', + help='Whether to automatically generate unique transform labels ' + 'for every transform. The default behavior is to raise an ' + 'exception if a transform is created with a non-unique label.') + class CrossLanguageOptions(PipelineOptions): @classmethod diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index f52616307e7b..ed0736250d1f 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -54,6 +54,7 @@ import shutil import tempfile import unicodedata +import uuid from collections import defaultdict from typing import TYPE_CHECKING from typing import Any @@ -681,13 +682,20 @@ def apply( alter_label_if_ipython(transform, pvalueish) full_label = '/'.join( - [self._current_transform().full_label, label or - transform.label]).lstrip('/') + [self._current_transform().full_label, transform.label]).lstrip('/') if full_label in self.applied_labels: - raise RuntimeError( - 'A transform with label "%s" already exists in the pipeline. ' - 'To apply a transform with a specified label write ' - 'pvalue | "label" >> transform' % full_label) + auto_unique_labels = self._options.view_as( + StandardOptions).auto_unique_labels + if auto_unique_labels: + # If auto_unique_labels is set, we will append a unique suffix to the + # label to make it unique. + unique_label = self._generate_unique_label(transform) + return self.apply(transform, pvalueish, unique_label) + else: + raise RuntimeError( + 'A transform with label "%s" already exists in the pipeline. ' + 'To apply a transform with a specified label write ' + 'pvalue | "label" >> transform' % full_label) self.applied_labels.add(full_label) pvalueish, inputs = transform._extract_input_pvalues(pvalueish) @@ -763,6 +771,19 @@ def apply( self.transforms_stack.pop() return pvalueish_result + def _generate_unique_label( + self, + transform # type: str + ): + # type: (...) -> str + + """ + Given a transform, generate a unique label for it based on current label. + """ + unique_suffix = uuid.uuid4().hex[:6] + return '%s_%s' % (transform.label, unique_suffix) + + def _infer_result_type( self, transform, # type: ptransform.PTransform diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index c9ac4ce4c13d..113d1a99990c 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -22,6 +22,7 @@ import copy import platform import unittest +import uuid import mock import pytest @@ -266,6 +267,32 @@ def test_reuse_custom_transform_instance(self): 'pipeline. To apply a transform with a specified label write ' 'pvalue | "label" >> transform') + def test_auto_unique_labels(self): + + opts = PipelineOptions(["--auto_unique_labels"]) + with mock.patch.object(uuid, 'uuid4') as mock_uuid_gen: + mock_uuids = [mock.Mock(hex='UUID01XXX'), mock.Mock(hex='UUID02XXX')] + mock_uuid_gen.side_effect = mock_uuids + with TestPipeline(options=opts) as pipeline: + pcoll = pipeline | 'pcoll' >> Create([1, 2, 3]) + + def identity(x): + return x + + pcoll2 = pcoll | Map(identity) + pcoll3 = pcoll2 | Map(identity) + pcoll4 = pcoll3 | Map(identity) + assert_that(pcoll4, equal_to([1, 2, 3])) + + map_id_full_labels = { + label + for label in pipeline.applied_labels if "Map(identity)" in label + } + map_id_leaf_labels = {label.split(":")[-1] for label in map_id_full_labels} + # Only the first 6 chars of the UUID hex should be used + assert map_id_leaf_labels == set( + ["Map(identity)", "Map(identity)_UUID01", "Map(identity)_UUID02"]) + def test_reuse_cloned_custom_transform_instance(self): with TestPipeline() as pipeline: pcoll1 = pipeline | 'pc1' >> Create([1, 2, 3])