diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 28614c6561c7..fcff86d4c50c 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -1101,6 +1101,22 @@ def __ror__(self, pvalueish, _unused=None): def expand(self, pvalue): raise RuntimeError("Should never be expanded directly.") + def __getattr__(self, attr): + transform_attr = getattr(self.transform, attr) + if callable(transform_attr): + + @wraps(transform_attr) + def wrapper(*args, **kwargs): + result = transform_attr(*args, **kwargs) + if isinstance(result, PTransform): + return _NamedPTransform(result, self.label) + else: + return result + + return wrapper + else: + return transform_attr + # Defined here to avoid circular import issues for Beam library transforms. def annotate_yaml(constructor): diff --git a/sdks/python/apache_beam/yaml/json_utils.py b/sdks/python/apache_beam/yaml/json_utils.py new file mode 100644 index 000000000000..e2cb03dc96a0 --- /dev/null +++ b/sdks/python/apache_beam/yaml/json_utils.py @@ -0,0 +1,183 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utilities for converting between JSON and Beam Schema'd data. + +For internal use, no backward compatibility guarantees. +""" + +import json +from typing import Any +from typing import Callable +from typing import Dict + +import apache_beam as beam +from apache_beam.portability.api import schema_pb2 +from apache_beam.typehints import schemas + +JSON_ATOMIC_TYPES_TO_BEAM = { + 'boolean': schema_pb2.BOOLEAN, + 'integer': schema_pb2.INT64, + 'number': schema_pb2.DOUBLE, + 'string': schema_pb2.STRING, +} + + +def json_schema_to_beam_schema( + json_schema: Dict[str, Any]) -> schema_pb2.Schema: + """Returns a Beam schema equivalent for the given Json schema.""" + def maybe_nullable(beam_type, nullable): + if nullable: + beam_type.nullable = True + return beam_type + + json_type = json_schema.get('type', None) + if json_type != 'object': + raise ValueError('Expected object type, got {json_type}.') + if 'properties' not in json_schema: + # Technically this is a valid (vacuous) schema, but as it's not generally + # meaningful, throw an informative error instead. + # (We could add a flag to allow this degenerate case.) + raise ValueError('Missing properties for {json_schema}.') + required = set(json_schema.get('required', [])) + return schema_pb2.Schema( + fields=[ + schemas.schema_field( + name, + maybe_nullable(json_type_to_beam_type(t), name not in required)) + for (name, t) in json_schema['properties'].items() + ]) + + +def json_type_to_beam_type(json_type: Dict[str, Any]) -> schema_pb2.FieldType: + """Returns a Beam schema type for the given Json (schema) type.""" + if not isinstance(json_type, dict) or 'type' not in json_type: + raise ValueError(f'Malformed type {json_type}.') + type_name = json_type['type'] + if type_name in JSON_ATOMIC_TYPES_TO_BEAM: + return schema_pb2.FieldType( + atomic_type=JSON_ATOMIC_TYPES_TO_BEAM[type_name]) + elif type_name == 'array': + return schema_pb2.FieldType( + array_type=schema_pb2.ArrayType( + element_type=json_type_to_beam_type(json_type['items']))) + elif type_name == 'object': + if 'properties' in json_type: + return schema_pb2.FieldType( + row_type=schema_pb2.RowType( + schema=json_schema_to_beam_schema(json_type))) + elif 'additionalProperties' in json_type: + return schema_pb2.FieldType( + map_type=schema_pb2.MapType( + key_type=schema_pb2.FieldType(atomic_type=schema_pb2.STRING), + value_type=json_type_to_beam_type( + json_type['additionalProperties']))) + else: + raise ValueError( + f'Object type must have either properties or additionalProperties, ' + f'got {json_type}.') + else: + raise ValueError(f'Unable to convert {json_type} to a Beam schema.') + + +def json_to_row(beam_type: schema_pb2.FieldType) -> Callable[[Any], Any]: + """Returns a callable converting Json objects to Beam rows of the given type. + + The input to the returned callable is expected to conform to the Json schema + corresponding to this Beam type. + """ + type_info = beam_type.WhichOneof("type_info") + if type_info == "atomic_type": + return lambda value: value + elif type_info == "array_type": + element_converter = json_to_row(beam_type.array_type.element_type) + return lambda value: [element_converter(e) for e in value] + elif type_info == "iterable_type": + element_converter = json_to_row(beam_type.iterable_type.element_type) + return lambda value: [element_converter(e) for e in value] + elif type_info == "map_type": + if beam_type.map_type.key_type.atomic_type != schema_pb2.STRING: + raise TypeError( + f'Only strings allowd as map keys when converting from JSON, ' + f'found {beam_type}') + value_converter = json_to_row(beam_type.map_type.value_type) + return lambda value: {k: value_converter(v) for (k, v) in value.items()} + elif type_info == "row_type": + converters = { + field.name: json_to_row(field.type) + for field in beam_type.row_type.schema.fields + } + return lambda value: beam.Row( + ** + {name: convert(value[name]) + for (name, convert) in converters.items()}) + elif type_info == "logical_type": + return lambda value: value + else: + raise ValueError(f"Unrecognized type_info: {type_info!r}") + + +def json_parser(beam_schema: schema_pb2.Schema) -> Callable[[bytes], beam.Row]: + """Returns a callable converting Json strings to Beam rows of the given type. + + The input to the returned callable is expected to conform to the Json schema + corresponding to this Beam type. + """ + to_row = json_to_row( + schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=beam_schema))) + return lambda s: to_row(json.loads(s)) + + +def row_to_json(beam_type: schema_pb2.FieldType) -> Callable[[Any], Any]: + """Returns a callable converting rows of the given type to Json objects.""" + type_info = beam_type.WhichOneof("type_info") + if type_info == "atomic_type": + return lambda value: value + elif type_info == "array_type": + element_converter = row_to_json(beam_type.array_type.element_type) + return lambda value: [element_converter(e) for e in value] + elif type_info == "iterable_type": + element_converter = row_to_json(beam_type.iterable_type.element_type) + return lambda value: [element_converter(e) for e in value] + elif type_info == "map_type": + if beam_type.map_type.key_type.atomic_type != schema_pb2.STRING: + raise TypeError( + f'Only strings allowd as map keys when converting to JSON, ' + f'found {beam_type}') + value_converter = row_to_json(beam_type.map_type.value_type) + return lambda value: {k: value_converter(v) for (k, v) in value.items()} + elif type_info == "row_type": + converters = { + field.name: row_to_json(field.type) + for field in beam_type.row_type.schema.fields + } + return lambda row: { + name: convert(getattr(row, name)) + for (name, convert) in converters.items() + } + elif type_info == "logical_type": + return lambda value: value + else: + raise ValueError(f"Unrecognized type_info: {type_info!r}") + + +def json_formater( + beam_schema: schema_pb2.Schema) -> Callable[[beam.Row], bytes]: + """Returns a callable converting rows of the given schema to Json strings.""" + convert = row_to_json( + schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=beam_schema))) + return lambda row: json.dumps(convert(row), sort_keys=True).encode('utf-8') diff --git a/sdks/python/apache_beam/yaml/yaml_io.py b/sdks/python/apache_beam/yaml/yaml_io.py index 4a1d12490057..3321644ded57 100644 --- a/sdks/python/apache_beam/yaml/yaml_io.py +++ b/sdks/python/apache_beam/yaml/yaml_io.py @@ -25,10 +25,12 @@ import os from typing import Any +from typing import Callable from typing import Iterable from typing import List from typing import Mapping from typing import Optional +from typing import Tuple import yaml @@ -39,6 +41,7 @@ from apache_beam.io.gcp.bigquery import BigQueryDisposition from apache_beam.portability.api import schema_pb2 from apache_beam.typehints import schemas +from apache_beam.yaml import json_utils from apache_beam.yaml import yaml_mapping from apache_beam.yaml import yaml_provider @@ -131,18 +134,25 @@ def raise_exception(failed_row_with_error): return WriteToBigQueryHandlingErrors() -def _create_parser(format, schema): +def _create_parser( + format, + schema: Any) -> Tuple[schema_pb2.Schema, Callable[[bytes], beam.Row]]: if format == 'raw': if schema: raise ValueError('raw format does not take a schema') return ( schema_pb2.Schema(fields=[schemas.schema_field('payload', bytes)]), lambda payload: beam.Row(payload=payload)) + elif format == 'json': + beam_schema = json_utils.json_schema_to_beam_schema(schema) + return beam_schema, json_utils.json_parser(beam_schema) else: raise ValueError(f'Unknown format: {format}') -def _create_formatter(format, schema, beam_schema): +def _create_formatter( + format, schema: Any, + beam_schema: schema_pb2.Schema) -> Callable[[beam.Row], bytes]: if format == 'raw': if schema: raise ValueError('raw format does not take a schema') @@ -150,6 +160,8 @@ def _create_formatter(format, schema, beam_schema): if len(field_names) != 1: raise ValueError(f'Expecting exactly one field, found {field_names}') return lambda row: getattr(row, field_names[0]) + elif format == 'json': + return json_utils.json_formater(beam_schema) else: raise ValueError(f'Unknown format: {format}') diff --git a/sdks/python/apache_beam/yaml/yaml_io_test.py b/sdks/python/apache_beam/yaml/yaml_io_test.py index ab6298661c15..72675da278b0 100644 --- a/sdks/python/apache_beam/yaml/yaml_io_test.py +++ b/sdks/python/apache_beam/yaml/yaml_io_test.py @@ -167,6 +167,101 @@ def test_read_with_id_attribute(self): result, equal_to([beam.Row(payload=b'msg1'), beam.Row(payload=b'msg2')])) + def test_read_json(self): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + with mock.patch('apache_beam.io.ReadFromPubSub', + FakeReadFromPubSub( + topic='my_topic', + messages=[PubsubMessage( + b'{"generator": {"x": 0, "y": 0}, "rank": 1}', + {'weierstrass': 'y^2+y=x^3-x', 'label': '37a'}) + ])): + result = p | YamlTransform( + ''' + type: ReadFromPubSub + config: + topic: my_topic + format: json + schema: + type: object + properties: + generator: + type: object + properties: + x: {type: integer} + y: {type: integer} + rank: {type: integer} + attributes: [label] + attributes_map: other + ''') + assert_that( + result, + equal_to([ + beam.Row( + generator=beam.Row(x=0, y=0), + rank=1, + label='37a', + other={ + 'label': '37a', 'weierstrass': 'y^2+y=x^3-x' + }) + ])) + + def test_read_json_with_error_handling(self): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + with mock.patch( + 'apache_beam.io.ReadFromPubSub', + FakeReadFromPubSub(topic='my_topic', + messages=[PubsubMessage('{"some_int": 123}', + attributes={}), + PubsubMessage('unparsable', + attributes={})])): + result = p | YamlTransform( + ''' + type: ReadFromPubSub + config: + topic: my_topic + format: json + schema: + type: object + properties: + some_int: {type: integer} + error_handling: + output: errors + ''') + assert_that( + result['good'], + equal_to([beam.Row(some_int=123)]), + label='CheckGood') + assert_that( + result['errors'] | beam.Map(lambda error: error.element), + equal_to(['unparsable']), + label='CheckErrors') + + def test_read_json_without_error_handling(self): + with self.assertRaises(Exception): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + with mock.patch( + 'apache_beam.io.ReadFromPubSub', + FakeReadFromPubSub(topic='my_topic', + messages=[PubsubMessage('{"some_int": 123}', + attributes={}), + PubsubMessage('unparsable', + attributes={})])): + _ = p | YamlTransform( + ''' + type: ReadFromPubSub + config: + topic: my_topic + format: json + schema: + type: object + properties: + some_int: {type: integer} + ''') + def test_simple_write(self): with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( pickle_library='cloudpickle')) as p: @@ -251,6 +346,34 @@ def test_write_with_id_attribute(self): id_attribute: some_attr ''')) + def test_write_json(self): + with beam.Pipeline(options=beam.options.pipeline_options.PipelineOptions( + pickle_library='cloudpickle')) as p: + with mock.patch('apache_beam.io.WriteToPubSub', + FakeWriteToPubSub( + topic='my_topic', + messages=[PubsubMessage( + b'{"generator": {"x": 0, "y": 0}, "rank": 1}', + {'weierstrass': 'y^2+y=x^3-x', 'label': '37a'}) + ])): + _ = ( + p | beam.Create([ + beam.Row( + label='37a', + generator=beam.Row(x=0, y=0), + rank=1, + other={'weierstrass': 'y^2+y=x^3-x'}) + ]) | YamlTransform( + ''' + type: WriteToPubSub + input: input + config: + topic: my_topic + format: json + attributes: [label] + attributes_map: other + ''')) + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO)