Skip to content

Commit

Permalink
minor refinements
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb committed Nov 1, 2023
1 parent 128d307 commit b0867b1
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected SchemaTransform from(Configuration configuration) {

@Override
public String identifier() {
return String.format("beam:schematransform:org.apache.beam:yaml:explode-java:v1");
return "beam:schematransform:org.apache.beam:yaml:explode:v1";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ protected SchemaTransform from(Configuration configuration) {

@Override
public String identifier() {
return String.format("beam:schematransform:org.apache.beam:yaml:filter-java:v1");
return "beam:schematransform:org.apache.beam:yaml:filter-java:v1";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ protected SchemaTransform from(Configuration configuration) {

@Override
public String identifier() {
return String.format("beam:schematransform:org.apache.beam:yaml:map_to_fields-java:v1");
return "beam:schematransform:org.apache.beam:yaml:map_to_fields-java:v1";
}

@Override
Expand Down
5 changes: 2 additions & 3 deletions sdks/python/apache_beam/yaml/yaml_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,6 @@ def expand(pcoll, error_handling=None, **kwargs):
return expand


# TODO(yaml): This should be available in all environments, in which case
# we choose the one that matches best.
class _Explode(beam.PTransform):
def __init__(
self,
Expand Down Expand Up @@ -289,11 +287,12 @@ def explode_zip(base, fields):
copy[field] = values[ix]
yield beam.Row(**copy)

cross_product = self._cross_product
return (
pcoll
| beam.FlatMap(
lambda row:
(explode_cross_product if self._cross_product else explode_zip)
(explode_cross_product if cross_product else explode_zip)
({name: getattr(row, name)
for name in all_fields}, to_explode)))

Expand Down

0 comments on commit b0867b1

Please sign in to comment.