Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAT: check records to comply jsonschema format field #5661

Merged
merged 4 commits into from
Sep 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-integrations/bases/source-acceptance-test/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"pytest-timeout~=1.4",
"pprintpp~=0.4",
"dpath~=2.0.1",
"jsonschema~=3.2.0",
]

setuptools.setup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,41 @@
#

import logging
import re
import pendulum
from collections import defaultdict
from typing import List, Mapping

from airbyte_cdk.models import AirbyteRecordMessage, ConfiguredAirbyteCatalog
from jsonschema import Draft4Validator, ValidationError
from jsonschema import Draft7Validator, FormatChecker, ValidationError, FormatError

timestamp_regex = re.compile(("^\d{4}-\d?\d-\d?\d" # date
"(\s|T)" # separator
"\d?\d:\d?\d:\d?\d(.\d+)?" # time
".*$" #timezone
))
Comment on lines +34 to +38
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a standard (e.g. ISO 8601) that this regex is based on?

Whether based on a standard or custom, it would be useful to add a description in the docs (somewhere on this page perhaps) so that date/time format requirements of a connector are clear when building one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, its based on bigquery timestamp format (https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#timestamp_type) as its most strict of sql timestamp formats we are supporting

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha, so we're lining it up to our most strict destination to ensure compatibility. Makes sense. Small piece in creating connector docs explaining that (with some examples) would be great.



class CustomFormatChecker(FormatChecker):

@staticmethod
def check_datetime(value: str) -> bool:
valid_format = timestamp_regex.match(value)
try:
pendulum.parse(value, strict=False)
except ValueError:
valid_time = False
else:
valid_time = True
return valid_format and valid_time

def check(self, instance, format):
if format == "date-time":
if not self.check_datetime(instance):
raise FormatError(f"{instance} has invalid datetime format")
else:
return super().check(instance, format)



def verify_records_schema(
Expand All @@ -38,7 +68,7 @@ def verify_records_schema(
"""
validators = {}
for stream in catalog.streams:
validators[stream.stream.name] = Draft4Validator(stream.stream.json_schema)
validators[stream.stream.name] = Draft7Validator(stream.stream.json_schema, format_checker=CustomFormatChecker())

stream_errors = defaultdict(dict)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ def record_schema_fixture():


@pytest.fixture(name="configured_catalog")
def catalog_fixture(record_schema) -> ConfiguredAirbyteCatalog:
def catalog_fixture(request, record_schema) -> ConfiguredAirbyteCatalog:
record_schema = request.param if hasattr(request, "param") else record_schema
stream = ConfiguredAirbyteStream(
stream=AirbyteStream(name="my_stream", json_schema=record_schema),
sync_mode=SyncMode.full_refresh,
Expand Down Expand Up @@ -96,3 +97,61 @@ def test_verify_records_schema(configured_catalog: ConfiguredAirbyteCatalog):
assert len(streams_with_errors) == 1, "only one stream"
assert len(streams_with_errors["my_stream"]) == 3, "only first error for each field"
assert errors == ["123 is not of type 'null', 'string'", "'text' is not of type 'number'", "None is not of type 'string'"]


@pytest.mark.parametrize(
"record, configured_catalog, valid",
[
# Send null data
({"a": None}, {"type": "object", "properties": {"a": {"type": "string", "format": "time"}}}, False),
# time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You have tests only for time and date-time. What about date? Could you please add tests for date format also?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

({"a": "sdf"}, {"type": "object", "properties": {"a": {"type": "string", "format": "time"}}}, False),
({"a": "12:00"}, {"type": "object", "properties": {"a": {"type": "string", "format": "time"}}}, False),
({"a": "12:00:90"}, {"type": "object", "properties": {"a": {"type": "string", "format": "time"}}}, False),
({"a": "12:00:22"}, {"type": "object", "properties": {"a": {"type": "string", "format": "time"}}}, True),
# date
({"a": "12:00:90"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date"}}}, False),
({"a": "2020-12-20"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date"}}}, True),
({"a": "2020-20-20"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date"}}}, False),
# date-time
# full date-time format with timezone only valid, according to https://datatracker.ietf.org/doc/html/rfc3339#section-5.6
({"a": "12:11:00"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, False),
({"a": "2018-11-13 20:20:39"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, True),
({"a": "2021-08-10T12:43:15"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, True),
({"a": "2021-08-10T12:43:15Z"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, True),
({"a": "2018-11-13T20:20:39+00:00"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, True),
({"a": "2018-21-13T20:20:39+00:00"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, False),
# This is valid for postgres sql but not valid for bigquery
({"a": "2014-09-27 9:35z"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, False),
# Seconds are obligatory for bigquery timestamp
({"a": "2014-09-27 9:35"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, False),
({"a": "2014-09-27 9:35:0z"}, {"type": "object", "properties": {"a": {"type": "string", "format": "date-time"}}}, True),
# email
({"a": "2018-11-13 20:20:39"}, {"type": "object", "properties": {"a": {"type": "string", "format": "email"}}}, False),
({"a": "[email protected]"}, {"type": "object", "properties": {"a": {"type": "string", "format": "email"}}}, True),
({"a": "Пример@example.com"}, {"type": "object", "properties": {"a": {"type": "string", "format": "email"}}}, True),
({"a": "写电子邮件@子邮件"}, {"type": "object", "properties": {"a": {"type": "string", "format": "email"}}}, True),
# hostname
({"a": "2018-11-13 20:20:39"}, {"type": "object", "properties": {"a": {"type": "string", "format": "hostname"}}}, False),
({"a": "[email protected]"}, {"type": "object", "properties": {"a": {"type": "string", "format": "hostname"}}}, False),
({"a": "localhost"}, {"type": "object", "properties": {"a": {"type": "string", "format": "hostname"}}}, True),
({"a": "example.com"}, {"type": "object", "properties": {"a": {"type": "string", "format": "hostname"}}}, True),
# ipv4
({"a": "example.com"}, {"type": "object", "properties": {"a": {"type": "string", "format": "ipv4"}}}, False),
({"a": "0.0.0.1000"}, {"type": "object", "properties": {"a": {"type": "string", "format": "ipv4"}}}, False),
({"a": "0.0.0.0"}, {"type": "object", "properties": {"a": {"type": "string", "format": "ipv4"}}}, True),
# ipv6
({"a": "example.com"}, {"type": "object", "properties": {"a": {"type": "string", "format": "ipv6"}}}, False),
({"a": "1080:0:0:0:8:800:200C:417A"}, {"type": "object", "properties": {"a": {"type": "string", "format": "ipv6"}}}, True),
({"a": "::1"}, {"type": "object", "properties": {"a": {"type": "string", "format": "ipv6"}}}, True),
({"a": "::"}, {"type": "object", "properties": {"a": {"type": "string", "format": "ipv6"}}}, True),
],
indirect=["configured_catalog"],
)
def test_validate_records_format(record, configured_catalog, valid):
records = [AirbyteRecordMessage(stream="my_stream", data=record, emitted_at=0)]
streams_with_errors = verify_records_schema(records, configured_catalog)
if valid:
assert not streams_with_errors
else:
assert streams_with_errors, f"Record {record} should produce errors against {configured_catalog.streams[0].stream.json_schema}"
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,11 @@ Each stream should have some data, if you can't guarantee this for particular st
||x||
|||x|
||||
### Schema format checking

If some field has [format](https://json-schema.org/understanding-json-schema/reference/string.html#format) attribute specified on its catalog json schema, Source Acceptance Testing framework performs checking against format. It support checking of all [builtin](https://json-schema.org/understanding-json-schema/reference/string.html#built-in-formats) jsonschema formats for draft 7 specification: email, hostnames, ip addresses, time, date and date-time formats.

Note: For date-time we are not checking against compliance against ISO8601 (and RFC3339 as subset of it). Since we are using specified format to set database column type on db normalization stage, value should be compliant to bigquery [timestamp](https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#timestamp_type) and SQL "timestamp with timezone" formats.
### Example of `expected_records.txt`:
In general, the expected_records.json should contain the subset of output of the records of particular stream you need to test.
The required fields are: `stream, data, emitted_at`
Expand Down