diff --git a/noxfile.py b/noxfile.py index b0dd90700..256badd91 100644 --- a/noxfile.py +++ b/noxfile.py @@ -48,6 +48,7 @@ "pytest-snapshot", "pyarrow", "requests-mock", + "rfc3339-validator", "time-machine", ] diff --git a/poetry.lock b/poetry.lock index c3c7c88b9..c1d6278ec 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1095,16 +1095,6 @@ files = [ {file = "MarkupSafe-2.1.3-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:5bbe06f8eeafd38e5d0a4894ffec89378b6c6a625ff57e3028921f8ff59318ac"}, {file = "MarkupSafe-2.1.3-cp311-cp311-win32.whl", hash = "sha256:dd15ff04ffd7e05ffcb7fe79f1b98041b8ea30ae9234aed2a9168b5797c3effb"}, {file = "MarkupSafe-2.1.3-cp311-cp311-win_amd64.whl", hash = "sha256:134da1eca9ec0ae528110ccc9e48041e0828d79f24121a1a146161103c76e686"}, - {file = "MarkupSafe-2.1.3-cp312-cp312-macosx_10_9_universal2.whl", hash = "sha256:f698de3fd0c4e6972b92290a45bd9b1536bffe8c6759c62471efaa8acb4c37bc"}, - {file = "MarkupSafe-2.1.3-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:aa57bd9cf8ae831a362185ee444e15a93ecb2e344c8e52e4d721ea3ab6ef1823"}, - {file = "MarkupSafe-2.1.3-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:ffcc3f7c66b5f5b7931a5aa68fc9cecc51e685ef90282f4a82f0f5e9b704ad11"}, - {file = "MarkupSafe-2.1.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:47d4f1c5f80fc62fdd7777d0d40a2e9dda0a05883ab11374334f6c4de38adffd"}, - {file = "MarkupSafe-2.1.3-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:1f67c7038d560d92149c060157d623c542173016c4babc0c1913cca0564b9939"}, - {file = "MarkupSafe-2.1.3-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:9aad3c1755095ce347e26488214ef77e0485a3c34a50c5a5e2471dff60b9dd9c"}, - {file = "MarkupSafe-2.1.3-cp312-cp312-musllinux_1_1_i686.whl", hash = "sha256:14ff806850827afd6b07a5f32bd917fb7f45b046ba40c57abdb636674a8b559c"}, - {file = "MarkupSafe-2.1.3-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8f9293864fe09b8149f0cc42ce56e3f0e54de883a9de90cd427f191c346eb2e1"}, - {file = "MarkupSafe-2.1.3-cp312-cp312-win32.whl", hash = "sha256:715d3562f79d540f251b99ebd6d8baa547118974341db04f5ad06d5ea3eb8007"}, - {file = "MarkupSafe-2.1.3-cp312-cp312-win_amd64.whl", hash = "sha256:1b8dd8c3fd14349433c79fa8abeb573a55fc0fdd769133baac1f5e07abf54aeb"}, {file = "MarkupSafe-2.1.3-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:8e254ae696c88d98da6555f5ace2279cf7cd5b3f52be2b5cf97feafe883b58d2"}, {file = "MarkupSafe-2.1.3-cp37-cp37m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:cb0932dc158471523c9637e807d9bfb93e06a95cbf010f1a38b98623b929ef2b"}, {file = "MarkupSafe-2.1.3-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:9402b03f1a1b4dc4c19845e5c749e3ab82d5078d16a2a4c2cd2df62d57bb0707"}, @@ -1950,7 +1940,6 @@ files = [ {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:69b023b2b4daa7548bcfbd4aa3da05b3a74b772db9e23b982788168117739938"}, {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:81e0b275a9ecc9c0c0c07b4b90ba548307583c125f54d5b6946cfee6360c733d"}, {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ba336e390cd8e4d1739f42dfe9bb83a3cc2e80f567d8805e11b46f4a943f5515"}, - {file = "PyYAML-6.0.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:326c013efe8048858a6d312ddd31d56e468118ad4cdeda36c719bf5bb6192290"}, {file = "PyYAML-6.0.1-cp310-cp310-win32.whl", hash = "sha256:bd4af7373a854424dabd882decdc5579653d7868b8fb26dc7d0e99f823aa5924"}, {file = "PyYAML-6.0.1-cp310-cp310-win_amd64.whl", hash = "sha256:fd1592b3fdf65fff2ad0004b5e363300ef59ced41c2e6b3a99d4089fa8c5435d"}, {file = "PyYAML-6.0.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:6965a7bc3cf88e5a1c3bd2e0b5c22f8d677dc88a455344035f03399034eb3007"}, @@ -1958,15 +1947,8 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:42f8152b8dbc4fe7d96729ec2b99c7097d656dc1213a3229ca5383f973a5ed6d"}, {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:062582fca9fabdd2c8b54a3ef1c978d786e0f6b3a1510e0ac93ef59e0ddae2bc"}, {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d2b04aac4d386b172d5b9692e2d2da8de7bfb6c387fa4f801fbf6fb2e6ba4673"}, - {file = "PyYAML-6.0.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:e7d73685e87afe9f3b36c799222440d6cf362062f78be1013661b00c5c6f678b"}, {file = "PyYAML-6.0.1-cp311-cp311-win32.whl", hash = "sha256:1635fd110e8d85d55237ab316b5b011de701ea0f29d07611174a1b42f1444741"}, {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, - {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, - {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, - {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, - {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, - {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, - {file = "PyYAML-6.0.1-cp312-cp312-win_amd64.whl", hash = "sha256:0d3304d8c0adc42be59c5f8a4d9e3d7379e6955ad754aa9d6ab7a398b59dd1df"}, {file = "PyYAML-6.0.1-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:50550eb667afee136e9a77d6dc71ae76a44df8b3e51e41b77f6de2932bfe0f47"}, {file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1fe35611261b29bd1de0070f0b2f47cb6ff71fa6595c077e42bd0c419fa27b98"}, {file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:704219a11b772aea0d8ecd7058d0082713c3562b4e271b849ad7dc4a5c90c13c"}, @@ -1983,7 +1965,6 @@ files = [ {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a0cd17c15d3bb3fa06978b4e8958dcdc6e0174ccea823003a106c7d4d7899ac5"}, {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:28c119d996beec18c05208a8bd78cbe4007878c6dd15091efb73a30e90539696"}, {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7e07cbde391ba96ab58e532ff4803f79c4129397514e1413a7dc761ccd755735"}, - {file = "PyYAML-6.0.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:49a183be227561de579b4a36efbb21b3eab9651dd81b1858589f796549873dd6"}, {file = "PyYAML-6.0.1-cp38-cp38-win32.whl", hash = "sha256:184c5108a2aca3c5b3d3bf9395d50893a7ab82a38004c8f61c258d4428e80206"}, {file = "PyYAML-6.0.1-cp38-cp38-win_amd64.whl", hash = "sha256:1e2722cc9fbb45d9b87631ac70924c11d3a401b2d7f410cc0e3bbf249f2dca62"}, {file = "PyYAML-6.0.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:9eb6caa9a297fc2c2fb8862bc5370d0303ddba53ba97e71f08023b6cd73d16a8"}, @@ -1991,7 +1972,6 @@ files = [ {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5773183b6446b2c99bb77e77595dd486303b4faab2b086e7b17bc6bef28865f6"}, {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:b786eecbdf8499b9ca1d697215862083bd6d2a99965554781d0d8d1ad31e13a0"}, {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bc1bf2925a1ecd43da378f4db9e4f799775d6367bdb94671027b73b393a7c42c"}, - {file = "PyYAML-6.0.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:04ac92ad1925b2cff1db0cfebffb6ffc43457495c9b3c39d3fcae417d7125dc5"}, {file = "PyYAML-6.0.1-cp39-cp39-win32.whl", hash = "sha256:faca3bdcf85b2fc05d06ff3fbc1f83e1391b3e724afa3feba7d13eeab355484c"}, {file = "PyYAML-6.0.1-cp39-cp39-win_amd64.whl", hash = "sha256:510c9deebc5c0225e8c96813043e62b680ba2f9c50a08d3724c7f28a747d1486"}, {file = "PyYAML-6.0.1.tar.gz", hash = "sha256:bfdf460b1736c775f2ba9f6a92bca30bc2095067b8a9d77876d1fad6cc3b4a43"}, @@ -2052,6 +2032,20 @@ six = "*" fixture = ["fixtures"] test = ["fixtures", "mock", "purl", "pytest", "requests-futures", "sphinx", "testtools"] +[[package]] +name = "rfc3339-validator" +version = "0.1.4" +description = "A pure python RFC3339 validator" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" +files = [ + {file = "rfc3339_validator-0.1.4-py2.py3-none-any.whl", hash = "sha256:24f6ec1eda14ef823da9e36ec7113124b39c04d50a4d3d3a3c2859577e7791fa"}, + {file = "rfc3339_validator-0.1.4.tar.gz", hash = "sha256:138a2abdf93304ad60530167e51d2dfb9549521a836871b88d7f4695d0022f6b"}, +] + +[package.dependencies] +six = "*" + [[package]] name = "rpds-py" version = "0.16.2" @@ -3054,4 +3048,4 @@ testing = ["pytest", "pytest-durations"] [metadata] lock-version = "2.0" python-versions = ">=3.7.1" -content-hash = "f778cde798db60fdbb5eb7522124aa9fe64c30b2d03edc38a6e1d6b7922064b9" +content-hash = "83d7ca48f338b584d732bcab1e7c4d24569443fd9693d2d1687d749a5c56d5f0" diff --git a/pyproject.toml b/pyproject.toml index 3c888b430..6bb31917e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -137,6 +137,7 @@ mypy = [ pytest-benchmark = ">=4.0.0" pytest-snapshot = ">=0.9.0" requests-mock = ">=1.10.0" +rfc3339-validator = ">=0.1.4" time-machine = [ { version = ">=2.10.0,<2.11", python = "<3.8" }, { version = ">=2.10.0", python = ">=3.8" }, diff --git a/singer_sdk/exceptions.py b/singer_sdk/exceptions.py index 75135e800..03b06a7c0 100644 --- a/singer_sdk/exceptions.py +++ b/singer_sdk/exceptions.py @@ -138,3 +138,22 @@ class ConformedNameClashException(Exception): class MissingKeyPropertiesError(Exception): """Raised when a recieved (and/or transformed) record is missing key properties.""" + + +class InvalidJSONSchema(Exception): + """Raised when a JSON schema is invalid.""" + + +class InvalidRecord(Exception): + """Raised when a stream record is invalid according to its declared schema.""" + + def __init__(self, error_message: str, record: dict) -> None: + """Initialize an InvalidRecord exception. + + Args: + error_message: A message describing the error. + record: The invalid record. + """ + super().__init__(f"Record Message Validation Error: {error_message}") + self.error_message = error_message + self.record = record diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index f1a7c0f92..a85a1651c 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -13,9 +13,14 @@ from gzip import open as gzip_open from types import MappingProxyType -from jsonschema import Draft7Validator +import jsonschema +from typing_extensions import override -from singer_sdk.exceptions import MissingKeyPropertiesError +from singer_sdk.exceptions import ( + InvalidJSONSchema, + InvalidRecord, + MissingKeyPropertiesError, +) from singer_sdk.helpers._batch import ( BaseBatchFileEncoding, BatchConfig, @@ -39,7 +44,83 @@ from singer_sdk.target_base import Target -JSONSchemaValidator = Draft7Validator + +class BaseJSONSchemaValidator(abc.ABC): + """Abstract base class for JSONSchema validator.""" + + def __init__(self, schema: dict[str, t.Any]) -> None: + """Initialize the record validator. + + Args: + schema: Schema of the stream to sink. + """ + self.schema = schema + + @abc.abstractmethod + def validate(self, record: dict[str, t.Any]) -> None: + """Validate a record message. + + This method MUST raise an ``InvalidRecord`` exception if the record is invalid. + + Args: + record: Record message to validate. + """ + + +class JSONSchemaValidator(BaseJSONSchemaValidator): + """Validate records using the ``fastjsonschema`` library.""" + + def __init__( + self, + schema: dict, + *, + validate_formats: bool = False, + format_checker: jsonschema.FormatChecker | None = None, + ): + """Initialize the validator. + + Args: + schema: Schema of the stream to sink. + validate_formats: Whether JSON string formats (e.g. ``date-time``) should + be validated. + format_checker: User-defined format checker. + + Raises: + InvalidJSONSchema: If the schema provided from tap or mapper is invalid. + """ + jsonschema_validator = jsonschema.Draft7Validator + + super().__init__(schema) + if validate_formats: + format_checker = format_checker or jsonschema_validator.FORMAT_CHECKER + else: + format_checker = jsonschema.FormatChecker(formats=()) + + try: + jsonschema_validator.check_schema(schema) + except jsonschema.SchemaError as e: + error_message = f"Schema Validation Error: {e}" + raise InvalidJSONSchema(error_message) from e + + self.validator = jsonschema_validator( + schema=schema, + format_checker=format_checker, + ) + + @override + def validate(self, record: dict): # noqa: ANN201 + """Validate a record message. + + Args: + record: Record message to validate. + + Raises: + InvalidRecord: If the record is invalid. + """ + try: + self.validator.validate(record) + except jsonschema.ValidationError as e: + raise InvalidRecord(e.message, record) from e class Sink(metaclass=abc.ABCMeta): @@ -51,6 +132,15 @@ class Sink(metaclass=abc.ABCMeta): MAX_SIZE_DEFAULT = 10000 + validate_schema = True + """Enable JSON schema record validation.""" + + validate_field_string_format = False + """Enable JSON schema format validation, for example `date-time` string fields.""" + + fail_on_record_validation_exception: bool = True + """Interrupt the target execution when a record fails schema validation.""" + def __init__( self, target: Target, @@ -95,10 +185,23 @@ def __init__( self._batch_records_read: int = 0 self._batch_dupe_records_merged: int = 0 - self._validator = Draft7Validator( - schema, - format_checker=Draft7Validator.FORMAT_CHECKER, - ) + self._validator: BaseJSONSchemaValidator | None = self.get_validator() + + def get_validator(self) -> BaseJSONSchemaValidator | None: + """Get a record validator for this sink. + + Override this method to use a custom format validator, or disable record + validation by returning `None`. + + Returns: + An instance of a subclass of ``BaseJSONSchemaValidator``. + """ + if self.validate_schema: + return JSONSchemaValidator( + self.schema, + validate_formats=self.validate_field_string_format, + ) + return None def _get_context(self, record: dict) -> dict: # noqa: ARG002 """Return an empty dictionary by default. @@ -328,8 +431,20 @@ def _validate_and_parse(self, record: dict) -> dict: Returns: TODO + + Raises: + InvalidRecord: If the record is invalid. """ - self._validator.validate(record) + if self._validator is not None: + # TODO: Check the performance impact of this try/except block. It runs + # on every record, so it's probably bad and should be moved up the stack. + try: + self._validator.validate(record) + except InvalidRecord as e: + if self.fail_on_record_validation_exception: + raise + self.logger.exception("Record validation failed %s", e) + self._parse_timestamps_in_record( record=record, schema=self.schema, diff --git a/tests/core/sinks/test_type_checker.py b/tests/core/sinks/test_type_checker.py new file mode 100644 index 000000000..28376d5d9 --- /dev/null +++ b/tests/core/sinks/test_type_checker.py @@ -0,0 +1,100 @@ +"""Test the custom type validator.""" + +from __future__ import annotations + +import pytest +from typing_extensions import override + +from singer_sdk.sinks.core import BaseJSONSchemaValidator, InvalidJSONSchema, Sink +from singer_sdk.target_base import Target + + +@pytest.fixture +def test_schema_invalid(): + """Return a test schema with an invalid type.""" + + return { + "type": "object", + "properties": { + "datetime_col": {"type": "ssttrriinngg", "format": "date-time"}, + }, + } + + +@pytest.fixture +def target(): + """Return a target object.""" + + class CustomTarget(Target): + name = "test_target" + + return CustomTarget() + + +def test_default_schema_type_checks(target, test_schema_invalid): + """Test type checks on _validator initialization.""" + + class CustomSink(Sink): + """Custom sink class.""" + + @override + def process_batch(self, context: dict) -> None: + pass + + @override + def process_record(self, record: dict, context: dict) -> None: + pass + + with pytest.raises( + InvalidJSONSchema, + match=r"Schema Validation Error: 'ssttrriinngg' is not valid under any", + ): + CustomSink(target, "test_stream", test_schema_invalid, None) + + +def test_disable_schema_type_checks_returning_none(target, test_schema_invalid): + """Test type checks on _validator initialization.""" + + class CustomSink(Sink): + """Custom sink class.""" + + @override + def get_validator(self) -> BaseJSONSchemaValidator | None: + """Get a record validator for this sink. + + Override this method to use a custom format validator + or disable jsonschema validator, by returning `None`. + + Returns: + An instance of a subclass of ``BaseJSONSchemaValidator``. + """ + return None + + @override + def process_batch(self, context: dict) -> None: + pass + + @override + def process_record(self, record: dict, context: dict) -> None: + pass + + CustomSink(target, "test_stream", test_schema_invalid, None) + + +def test_disable_schema_type_checks_setting_false(target, test_schema_invalid): + """Test type checks on _validator initialization.""" + + class CustomSink(Sink): + """Custom sink class.""" + + validate_schema = False + + @override + def process_batch(self, context: dict) -> None: + pass + + @override + def process_record(self, record: dict, context: dict) -> None: + pass + + CustomSink(target, "test_stream", test_schema_invalid, None) diff --git a/tests/core/sinks/test_validation.py b/tests/core/sinks/test_validation.py index 0672c9f49..e583fddad 100644 --- a/tests/core/sinks/test_validation.py +++ b/tests/core/sinks/test_validation.py @@ -5,6 +5,7 @@ import pytest +from singer_sdk.exceptions import InvalidRecord from tests.conftest import BatchSinkMock, TargetMock @@ -58,6 +59,120 @@ def test_validate_record(): assert updated_record["invalid_datetime"] == "9999-12-31 23:59:59.999999" +@pytest.fixture +def draft7_sink_stop(): + """Return a sink object with Draft7 checks enabled.""" + + class CustomSink(BatchSinkMock): + """Custom sink class.""" + + validate_field_string_format = True + + return CustomSink( + TargetMock(), + "users", + { + "type": "object", + "properties": { + "id": {"type": "integer"}, + "created_at": {"type": "string", "format": "date-time"}, + "created_at_date": {"type": "string", "format": "date"}, + "created_at_time": {"type": "string", "format": "time"}, + "invalid_datetime": {"type": "string", "format": "date-time"}, + }, + }, + ["id"], + ) + + +@pytest.fixture +def draft7_sink_continue(): + """Return a sink object with Draft7 checks enabled.""" + + class CustomSink(BatchSinkMock): + """Custom sink class.""" + + validate_field_string_format = True + fail_on_record_validation_exception = False + + return CustomSink( + TargetMock(), + "users", + { + "type": "object", + "properties": { + "id": {"type": "integer"}, + "created_at": {"type": "string", "format": "date-time"}, + "created_at_date": {"type": "string", "format": "date"}, + "created_at_time": {"type": "string", "format": "time"}, + "invalid_datetime": {"type": "string", "format": "date-time"}, + }, + }, + ["id"], + ) + + +def test_validate_record_jsonschema_format_checking_enabled_stop_on_error( + draft7_sink_stop, +): + sink: BatchSinkMock = draft7_sink_stop + + record = { + "id": 1, + "created_at": "2021-01-01T00:00:00+00:00", + "created_at_date": "2021-01-01", + "created_at_time": "00:01:00+00:00", + "missing_datetime": "2021-01-01T00:00:00+00:00", + "invalid_datetime": "not a datetime", + } + with pytest.raises( + InvalidRecord, + match=r"Record Message Validation Error", + ): + sink._validate_and_parse(record) + + +def test_validate_record_jsonschema_format_checking_enabled_continue_on_error( + capsys: pytest.CaptureFixture, + draft7_sink_continue, +): + sink: BatchSinkMock = draft7_sink_continue + + record = { + "id": 1, + "created_at": "2021-01-01T00:00:00+00:00", + "created_at_date": "2021-01-01", + "created_at_time": "00:01:00+00:00", + "missing_datetime": "2021-01-01T00:00:00+00:00", + "invalid_datetime": "not a datetime", + } + + updated_record = sink._validate_and_parse(record) + captured = capsys.readouterr() + + assert updated_record["created_at"] == datetime.datetime( + 2021, + 1, + 1, + 0, + 0, + tzinfo=datetime.timezone.utc, + ) + assert updated_record["created_at_date"] == datetime.date( + 2021, + 1, + 1, + ) + assert updated_record["created_at_time"] == datetime.time( + 0, + 1, + tzinfo=datetime.timezone.utc, + ) + assert updated_record["missing_datetime"] == "2021-01-01T00:00:00+00:00" + assert updated_record["invalid_datetime"] == "9999-12-31 23:59:59.999999" + assert "Record Message Validation Error" in captured.err + + @pytest.fixture def bench_sink() -> BatchSinkMock: target = TargetMock()