Skip to content

Commit

Permalink
Merge pull request #29830 [YAML] More complete schema validation.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb authored Jan 12, 2024
2 parents c38dc77 + 1c7f839 commit 7cb559f
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 14 deletions.
50 changes: 46 additions & 4 deletions sdks/python/apache_beam/yaml/generate_yaml_docs.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@
#

import argparse
import contextlib
import re

import yaml

from apache_beam.portability.api import schema_pb2
from apache_beam.utils import subprocess_server
from apache_beam.yaml import json_utils
from apache_beam.yaml import yaml_provider


Expand Down Expand Up @@ -166,7 +168,8 @@ def transform_docs(t, providers):

def main():
parser = argparse.ArgumentParser()
parser.add_argument('output_file')
parser.add_argument('--markdown_file')
parser.add_argument('--schema_file')
parser.add_argument('--include', default='.*')
parser.add_argument(
'--exclude', default='(Combine)|(Filter)|(MapToFields)-.*')
Expand All @@ -175,13 +178,52 @@ def main():
exclude = re.compile(options.exclude).match

with subprocess_server.SubprocessServer.cache_subprocesses():
with open(options.output_file, 'w') as fout:
json_config_schemas = []
with contextlib.ExitStack() as stack:
if options.markdown_file:
markdown_out = stack.enter_context(open(options.markdown_file, 'w'))
providers = yaml_provider.standard_providers()
for transform in sorted(providers.keys(), key=io_grouping_key):
if include(transform) and not exclude(transform):
print(transform)
fout.write(transform_docs(transform, providers[transform]))
fout.write('\n\n')
if options.markdown_file:
markdown_out.write(transform_docs(transform, providers[transform]))
markdown_out.write('\n\n')
if options.schema_file:
schema = providers[transform][0].config_schema(transform)
if schema:
json_config_schemas.append({
'if': {
'properties': {
'type': {
'const': transform
}
}
},
'then': {
'properties': {
'config': {
'type': 'object',
'properties': {
'__line__': {
'type': 'integer'
},
'__uuid__': {},
**{
f.name: json_utils.beam_type_to_json_type(
f.type)
for f in schema.fields
}
},
'additionalProperties': False,
}
}
}
})

if options.schema_file:
with open(options.schema_file, 'w') as fout:
yaml.dump(json_config_schemas, fout, sort_keys=False)


if __name__ == '__main__':
Expand Down
49 changes: 49 additions & 0 deletions sdks/python/apache_beam/yaml/json_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@
'string': schema_pb2.STRING,
}

BEAM_ATOMIC_TYPES_TO_JSON = {
schema_pb2.INT16: 'integer',
schema_pb2.INT32: 'integer',
schema_pb2.FLOAT: 'number',
**{v: k
for k, v in JSON_ATOMIC_TYPES_TO_BEAM.items()}
}


def json_schema_to_beam_schema(
json_schema: Dict[str, Any]) -> schema_pb2.Schema:
Expand Down Expand Up @@ -97,6 +105,47 @@ def json_type_to_beam_type(json_type: Dict[str, Any]) -> schema_pb2.FieldType:
raise ValueError(f'Unable to convert {json_type} to a Beam schema.')


def beam_type_to_json_type(beam_type: schema_pb2.FieldType) -> Dict[str, Any]:
type_info = beam_type.WhichOneof("type_info")
if type_info == "atomic_type":
if beam_type.atomic_type in BEAM_ATOMIC_TYPES_TO_JSON:
return {'type': BEAM_ATOMIC_TYPES_TO_JSON[beam_type.atomic_type]}
else:
return {}
elif type_info == "array_type":
return {
'type': 'array',
'items': beam_type_to_json_type(beam_type.array_type.element_type)
}
elif type_info == "iterable_type":
return {
'type': 'array',
'items': beam_type_to_json_type(beam_type.iterable_type.element_type)
}
elif type_info == "map_type":
return {
'type': 'object',
'properties': {
'__line__': {
'type': 'integer'
}, '__uuid__': {}
},
'additionalProperties': beam_type_to_json_type(
beam_type.map_type.value_type)
}
elif type_info == "row_type":
return {
'type': 'object',
'properties': {
field.name: beam_type_to_json_type(field.type)
for field in beam_type.row_type.schema.fields
},
'additionalProperties': False
}
else:
return {}


def json_to_row(beam_type: schema_pb2.FieldType) -> Callable[[Any], Any]:
"""Returns a callable converting Json objects to Beam rows of the given type.
Expand Down
9 changes: 8 additions & 1 deletion sdks/python/apache_beam/yaml/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ def _configure_parser(argv):
'--pipeline_spec_file',
'--yaml_pipeline_file',
help='A file containing a yaml description of the pipeline to run.')
parser.add_argument(
'--json_schema_validation',
default='generic',
help='none: do no pipeline validation against the schema; '
'generic: validate the pipeline shape, but not individual transforms; '
'per_transform: also validate the config of known transforms')
return parser.parse_known_args(argv)


Expand Down Expand Up @@ -72,7 +78,8 @@ def run(argv=None):
'options', {}))),
display_data={'yaml': pipeline_yaml}) as p:
print("Building pipeline...")
yaml_transform.expand_pipeline(p, pipeline_spec)
yaml_transform.expand_pipeline(
p, pipeline_spec, validate_schema=known_args.json_schema_validation)
print("Running pipeline...")


Expand Down
43 changes: 34 additions & 9 deletions sdks/python/apache_beam/yaml/yaml_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#

import collections
import functools
import json
import logging
import os
Expand Down Expand Up @@ -48,17 +49,41 @@
except ImportError:
jsonschema = None

if jsonschema is not None:

@functools.lru_cache
def pipeline_schema(strictness):
with open(os.path.join(os.path.dirname(__file__),
'pipeline.schema.yaml')) as yaml_file:
pipeline_schema = yaml.safe_load(yaml_file)


def validate_against_schema(pipeline):
if strictness == 'per_transform':
transform_schemas_path = os.path.join(
os.path.dirname(__file__), 'transforms.schema.yaml')
if not os.path.exists(transform_schemas_path):
raise RuntimeError(
"Please run "
"python -m apache_beam.yaml.generate_yaml_docs "
f"--schema_file='{transform_schemas_path}' "
"to run with transform-specific validation.")
with open(transform_schemas_path) as fin:
pipeline_schema['$defs']['transform']['allOf'].extend(yaml.safe_load(fin))
return pipeline_schema


def _closest_line(o, path):
best_line = SafeLineLoader.get_line(o)
for step in path:
o = o[step]
maybe_line = SafeLineLoader.get_line(o)
if maybe_line != 'unknown':
best_line = maybe_line
return best_line


def validate_against_schema(pipeline, strictness):
try:
jsonschema.validate(pipeline, pipeline_schema)
jsonschema.validate(pipeline, pipeline_schema(strictness))
except jsonschema.ValidationError as exn:
exn.message += f" at line {SafeLineLoader.get_line(exn.instance)}"
exn.message += f" around line {_closest_line(pipeline, exn.path)}"
raise exn


Expand Down Expand Up @@ -989,13 +1014,13 @@ def expand_pipeline(
pipeline,
pipeline_spec,
providers=None,
validate_schema=jsonschema is not None):
validate_schema='generic' if jsonschema is not None else None):
if isinstance(pipeline_spec, str):
pipeline_spec = yaml.load(pipeline_spec, Loader=SafeLineLoader)
# TODO(robertwb): It's unclear whether this gives as good of errors, but
# this could certainly be handy as a first pass when Beam is not available.
if validate_schema:
validate_against_schema(pipeline_spec)
if validate_schema and validate_schema != 'none':
validate_against_schema(pipeline_spec, validate_schema)
# Calling expand directly to avoid outer layer of nesting.
return YamlTransform(
pipeline_as_composite(pipeline_spec['pipeline']),
Expand Down

0 comments on commit 7cb559f

Please sign in to comment.