-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a simple validation transform to yaml. #32956
Changes from 4 commits
32a9d53
38b1855
ebe27c6
a354dad
c391dfc
d93f47a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -106,6 +106,18 @@ def json_type_to_beam_type(json_type: Dict[str, Any]) -> schema_pb2.FieldType: | |
raise ValueError(f'Unable to convert {json_type} to a Beam schema.') | ||
|
||
|
||
def beam_schema_to_json_schema( | ||
beam_schema: schema_pb2.Schema) -> Dict[str, Any]: | ||
return { | ||
'type': 'object', | ||
'properties': { | ||
field.name: beam_type_to_json_type(field.type) | ||
for field in beam_schema.fields | ||
}, | ||
'additionalProperties': False | ||
} | ||
|
||
|
||
def beam_type_to_json_type(beam_type: schema_pb2.FieldType) -> Dict[str, Any]: | ||
type_info = beam_type.WhichOneof("type_info") | ||
if type_info == "atomic_type": | ||
|
@@ -267,3 +279,53 @@ def json_formater( | |
convert = row_to_json( | ||
schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=beam_schema))) | ||
return lambda row: json.dumps(convert(row), sort_keys=True).encode('utf-8') | ||
|
||
|
||
def _validate_compatible(weak_schema, strong_schema): | ||
if not weak_schema: | ||
return | ||
if weak_schema['type'] != strong_schema['type']: | ||
raise ValueError( | ||
'Incompatible types: %r vs %r' % | ||
(weak_schema['type'] != strong_schema['type'])) | ||
if weak_schema['type'] == 'array': | ||
_validate_compatible(weak_schema['items'], strong_schema['items']) | ||
elif weak_schema == 'object': | ||
for required in strong_schema.get('required', []): | ||
if required not in weak_schema['properties']: | ||
raise ValueError('Missing or unkown property %r' % required) | ||
for name, spec in weak_schema.get('properties', {}): | ||
if name in strong_schema['properties']: | ||
try: | ||
_validate_compatible(spec, strong_schema['properties'][name]) | ||
except Exception as exn: | ||
raise ValueError('Incompatible schema for %r' % name) from exn | ||
elif not strong_schema.get('additionalProperties'): | ||
raise ValueError( | ||
'Prohibited property: {property}; ' | ||
'perhaps additionalProperties: False is missing?') | ||
|
||
|
||
def row_validator(beam_schema: schema_pb2.Schema, | ||
json_schema: Dict[str, Any]) -> Callable[[Any], Any]: | ||
"""Returns a callable that will fail on elements not respecting json_schema. | ||
""" | ||
if not json_schema: | ||
return lambda x: None | ||
Comment on lines
+313
to
+314
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Presumably this never happens since json_schema is a required parameter to the Validate transform, but wouldn't this also imply that if a json_schema is not given, it will pass silently? Nit, but perhaps having an error or warn log on compilation here instead would make sense. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This handles the degenerate case where the schema is Though one could ask why one would even have this transform at all, it's quite possible the schema is provided elsewhere and we want to handle this case gracefully (similar to how empty lists are handled despite being degenerate). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah ok I was unaware of the |
||
|
||
# Validate that this compiles, but avoid pickling the validator itself. | ||
_ = jsonschema.validators.validator_for(json_schema)(json_schema) | ||
_validate_compatible(beam_schema_to_json_schema(beam_schema), json_schema) | ||
validator = None | ||
|
||
convert = row_to_json( | ||
schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=beam_schema))) | ||
|
||
def validate(row): | ||
nonlocal validator | ||
if validator is None: | ||
validator = jsonschema.validators.validator_for(json_schema)( | ||
json_schema) | ||
validator.validate(convert(row)) | ||
|
||
return validate |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to my other comment, but wouldn't this pass silently? I'm not sure how it would reach this state in the first place, but if the incoming PCollection does not have a schema (perhaps if preceded by a transform that does not output Row?), this would pass validation even if given a json schema to validate against.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, weak_schema could be
{}
. This comes up in practice if we don't know anything about this part of the input (e.g. it'sAny
), which is also the fallback forbeam_type_to_json_type
.