Skip to content

Commit

Permalink
fixup: Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mravi committed Dec 7, 2024
1 parent 1f5dd82 commit 1199e5b
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 20 deletions.
27 changes: 8 additions & 19 deletions sdks/python/apache_beam/yaml/yaml_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -956,42 +956,31 @@ def preprocess_languages(spec):
return spec

def validate_transform_references(spec):
if 'transforms' not in spec:
return spec

for transform in spec['transforms']:
name = transform.get('name')
inputs = transform.get('input')
if name is None or inputs is None:
continue

input_values = []
if isinstance(inputs, str):
input_values = [inputs]
elif isinstance(inputs, list):
input_values = inputs
elif isinstance(inputs, dict):
input_values = list(inputs.values())
name = spec.get('name', '')
transform_type = spec.get('type')
inputs = spec.get('input').get('input', [])

if not is_empty(inputs):
input_values = [inputs] if isinstance(inputs, str) else inputs
for input_value in input_values:
if isinstance(input_value, str) and input_value.lower() == name.lower():
if input_value in (name, transform_type):
raise ValueError(
f"Circular reference detected: Transform {name} "
f"references itself as input in {identify_object(transform)}")
f"references itself as input in {identify_object(spec)}")

return spec

for phase in [
ensure_transforms_have_types,
normalize_mapping,
normalize_combine,
validate_transform_references,
preprocess_languages,
ensure_transforms_have_providers,
preprocess_source_sink,
preprocess_chain,
tag_explicit_inputs,
normalize_inputs_outputs,
validate_transform_references,
preprocess_flattened_inputs,
ensure_errors_consumed,
preprocess_windowing,
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/yaml/yaml_transform_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ def test_name_is_ambiguous(self):
with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions(
pickle_library='cloudpickle')) as p:
# pylint: disable=expression-not-assigned
with self.assertRaisesRegex(ValueError, r'Ambiguous.*'):
with self.assertRaisesRegex(ValueError, r'Circular reference detected.*'):
p | YamlTransform(
'''
type: composite
Expand Down

0 comments on commit 1199e5b

Please sign in to comment.