diff --git a/sdks/python/apache_beam/yaml/yaml_join.py b/sdks/python/apache_beam/yaml/yaml_join.py index 5124ef56b49c..b22e452b27f9 100644 --- a/sdks/python/apache_beam/yaml/yaml_join.py +++ b/sdks/python/apache_beam/yaml/yaml_join.py @@ -62,9 +62,11 @@ def _validate_equalities(equalities, pcolls): error_prefix = f'Invalid value "{equalities}" for "equalities".' valid_cols = { - name: set(dict(pcoll.element_type._fields).keys()) - for name, - pcoll in pcolls.items() + name: set( + dict(fields).keys() if fields and all( + isinstance(field, tuple) for field in fields) else fields) + for (name, pcoll) in pcolls.items() + for fields in [getattr(pcoll.element_type, '_fields', [])] } if isinstance(equalities, str):