Skip to content

Commit

Permalink
[PYTHON] Add new --auto_unique_labels option to StandardOptions (#2…
Browse files Browse the repository at this point in the history
  • Loading branch information
hjtran authored Oct 24, 2023
1 parent 8a827e8 commit 48722f1
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 6 deletions.
8 changes: 8 additions & 0 deletions sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,14 @@ def _add_argparse_args(cls, parser):
'at transform level. Interpretation of hints is defined by '
'Beam runners.'))

parser.add_argument(
'--auto_unique_labels',
default=False,
action='store_true',
help='Whether to automatically generate unique transform labels '
'for every transform. The default behavior is to raise an '
'exception if a transform is created with a non-unique label.')


class CrossLanguageOptions(PipelineOptions):
@classmethod
Expand Down
33 changes: 27 additions & 6 deletions sdks/python/apache_beam/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import shutil
import tempfile
import unicodedata
import uuid
from collections import defaultdict
from typing import TYPE_CHECKING
from typing import Any
Expand Down Expand Up @@ -681,13 +682,20 @@ def apply(
alter_label_if_ipython(transform, pvalueish)

full_label = '/'.join(
[self._current_transform().full_label, label or
transform.label]).lstrip('/')
[self._current_transform().full_label, transform.label]).lstrip('/')
if full_label in self.applied_labels:
raise RuntimeError(
'A transform with label "%s" already exists in the pipeline. '
'To apply a transform with a specified label write '
'pvalue | "label" >> transform' % full_label)
auto_unique_labels = self._options.view_as(
StandardOptions).auto_unique_labels
if auto_unique_labels:
# If auto_unique_labels is set, we will append a unique suffix to the
# label to make it unique.
unique_label = self._generate_unique_label(transform)
return self.apply(transform, pvalueish, unique_label)
else:
raise RuntimeError(
'A transform with label "%s" already exists in the pipeline. '
'To apply a transform with a specified label write '
'pvalue | "label" >> transform' % full_label)
self.applied_labels.add(full_label)

pvalueish, inputs = transform._extract_input_pvalues(pvalueish)
Expand Down Expand Up @@ -763,6 +771,19 @@ def apply(
self.transforms_stack.pop()
return pvalueish_result

def _generate_unique_label(
self,
transform # type: str
):
# type: (...) -> str

"""
Given a transform, generate a unique label for it based on current label.
"""
unique_suffix = uuid.uuid4().hex[:6]
return '%s_%s' % (transform.label, unique_suffix)


def _infer_result_type(
self,
transform, # type: ptransform.PTransform
Expand Down
27 changes: 27 additions & 0 deletions sdks/python/apache_beam/pipeline_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import copy
import platform
import unittest
import uuid

import mock
import pytest
Expand Down Expand Up @@ -266,6 +267,32 @@ def test_reuse_custom_transform_instance(self):
'pipeline. To apply a transform with a specified label write '
'pvalue | "label" >> transform')

def test_auto_unique_labels(self):

opts = PipelineOptions(["--auto_unique_labels"])
with mock.patch.object(uuid, 'uuid4') as mock_uuid_gen:
mock_uuids = [mock.Mock(hex='UUID01XXX'), mock.Mock(hex='UUID02XXX')]
mock_uuid_gen.side_effect = mock_uuids
with TestPipeline(options=opts) as pipeline:
pcoll = pipeline | 'pcoll' >> Create([1, 2, 3])

def identity(x):
return x

pcoll2 = pcoll | Map(identity)
pcoll3 = pcoll2 | Map(identity)
pcoll4 = pcoll3 | Map(identity)
assert_that(pcoll4, equal_to([1, 2, 3]))

map_id_full_labels = {
label
for label in pipeline.applied_labels if "Map(identity)" in label
}
map_id_leaf_labels = {label.split(":")[-1] for label in map_id_full_labels}
# Only the first 6 chars of the UUID hex should be used
assert map_id_leaf_labels == set(
["Map(identity)", "Map(identity)_UUID01", "Map(identity)_UUID02"])

def test_reuse_cloned_custom_transform_instance(self):
with TestPipeline() as pipeline:
pcoll1 = pipeline | 'pc1' >> Create([1, 2, 3])
Expand Down

0 comments on commit 48722f1

Please sign in to comment.