Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor and cleanup yaml MapToFields. #28462

Merged
merged 7 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2258,6 +2258,10 @@ def __init__(self, pcoll, exception_handling_args, upstream_errors=()):
self._exception_handling_args = exception_handling_args
self._upstream_errors = upstream_errors

@property
def element_type(self):
return self._pcoll.element_type

def main_output_tag(self):
return self._exception_handling_args.get('main_tag', 'good')

Expand Down Expand Up @@ -2309,6 +2313,10 @@ def __init__(self, pvalue, exception_handling_args=None):
else:
self._pvalue = _PValueWithErrors(pvalue, exception_handling_args)

@property
def element_type(self):
return self._pvalue.element_type

def __or__(self, transform):
return self.apply(transform)

Expand Down
23 changes: 14 additions & 9 deletions sdks/python/apache_beam/yaml/readme_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.typehints import trivial_inference
from apache_beam.yaml import yaml_mapping
from apache_beam.yaml import yaml_provider
from apache_beam.yaml import yaml_transform

Expand Down Expand Up @@ -85,13 +86,16 @@ def guess_name_and_type(expr):
typ, = [t for t in typ.__args__ if t is not type(None)]
return name, typ

output_schema = [
guess_name_and_type(expr) for expr in m.group(1).split(',')
]
output_element = beam.Row(**{name: typ() for name, typ in output_schema})
return next(iter(inputs.values())) | beam.Map(
lambda _: output_element).with_output_types(
trivial_inference.instance_to_type(output_element))
if m.group(1) == '*':
return inputs['PCOLLECTION'] | beam.Filter(lambda _: True)
else:
output_schema = [
guess_name_and_type(expr) for expr in m.group(1).split(',')
]
output_element = beam.Row(**{name: typ() for name, typ in output_schema})
return next(iter(inputs.values())) | beam.Map(
lambda _: output_element).with_output_types(
trivial_inference.instance_to_type(output_element))


class FakeReadFromPubSub(beam.PTransform):
Expand Down Expand Up @@ -204,12 +208,13 @@ def test(self):
]
options['render_leaf_composite_nodes'] = ['.*']
test_provider = TestProvider(TEST_TRANSFORMS)
test_sql_mapping_provider = yaml_mapping.SqlMappingProvider(test_provider)
p = beam.Pipeline(options=PipelineOptions(**options))
yaml_transform.expand_pipeline(
p,
modified_yaml,
{t: test_provider
for t in test_provider.provided_transforms()})
yaml_provider.merge_providers(
[test_provider, test_sql_mapping_provider]))
if test_type == 'BUILD':
return
p.run().wait_until_finish()
Expand Down
35 changes: 14 additions & 21 deletions sdks/python/apache_beam/yaml/yaml_mapping.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ Currently, in addition to Python, SQL expressions are supported as well

Sometimes it may be desirable to emit more (or less) than one record for each
input record. This can be accomplished by mapping to an iterable type and
noting that the specific field should be exploded, e.g.
following the mapping with an Explode operation, e.g.

```
- type: MapToFields
Expand All @@ -140,7 +140,9 @@ noting that the specific field should be exploded, e.g.
fields:
new_col: "[col1.upper(), col1.lower(), col1.title()]"
another_col: "col2 + col3"
explode: new_col
- type: Explode
config:
fields: new_col
```

will result in three output records for every input record.
Expand All @@ -155,7 +157,9 @@ product over all fields should be taken. For example
fields:
new_col: "[col1.upper(), col1.lower(), col1.title()]"
another_col: "[col2 - 1, col2, col2 + 1]"
explode: [new_col, another_col]
- type: Explode
config:
fields: [new_col, another_col]
cross_product: true
```

Expand All @@ -168,38 +172,27 @@ will emit nine records whereas
fields:
new_col: "[col1.upper(), col1.lower(), col1.title()]"
another_col: "[col2 - 1, col2, col2 + 1]"
explode: [new_col, another_col]
- type: Explode
config:
fields: [new_col, another_col]
cross_product: false
```

will only emit three.

If one is only exploding existing fields, a simpler `Explode` transform may be
used instead
The `Explode` operation can be used on its own if the field in question is
already an iterable type.

```
- type: Explode
config:
explode: [col1]
fields: [col1]
```

## Filtering

Sometimes it can be desirable to only keep records that satisfy a certain
criteria. This can be accomplished by specifying a keep parameter, e.g.

```
- type: MapToFields
config:
language: python
fields:
new_col: "col1.upper()"
another_col: "col2 + col3"
keep: "col2 > 0"
```

Like explode, there is a simpler `Filter` transform useful when no mapping is
being done
criteria. This can be accomplished with a `Filter` transform, e.g.

```
- type: Filter
Expand Down
Loading
Loading