From 3bae0ffdbcaee444b3a210b5d00e93e1aa3f32db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= <16805946+edgarrmondragon@users.noreply.github.com> Date: Wed, 6 Nov 2024 17:05:33 -0600 Subject: [PATCH] feat(targets): SQL target developers can now more easily override the mapping from JSON schema to SQL column type (#2732) * feat(target): SQL target developers can now more easily override the mapping from JSON schema to SQL column type * Add failing test for complex type case * Correctly handle multiple types * Make mypy happy * Test more cases * Add docs * Fix annotation in docs * Support custom fallbacks * Add a way to customize raw string handling --- ...ger_sdk.connectors.sql.JSONSchemaToSQL.rst | 8 + docs/guides/index.md | 1 + docs/guides/sql-target.md | 52 ++++ docs/reference.rst | 1 + singer_sdk/connectors/sql.py | 241 +++++++++++++++++- singer_sdk/typing.py | 4 + tests/core/test_connector_sql.py | 200 ++++++++++++++- tests/core/test_sql_typing.py | 3 +- tests/core/test_typing.py | 1 + 9 files changed, 505 insertions(+), 6 deletions(-) create mode 100644 docs/classes/singer_sdk.connectors.sql.JSONSchemaToSQL.rst create mode 100644 docs/guides/sql-target.md diff --git a/docs/classes/singer_sdk.connectors.sql.JSONSchemaToSQL.rst b/docs/classes/singer_sdk.connectors.sql.JSONSchemaToSQL.rst new file mode 100644 index 000000000..82ad1c544 --- /dev/null +++ b/docs/classes/singer_sdk.connectors.sql.JSONSchemaToSQL.rst @@ -0,0 +1,8 @@ +singer_sdk.connectors.sql.JSONSchemaToSQL +========================================= + +.. currentmodule:: singer_sdk.connectors.sql + +.. autoclass:: JSONSchemaToSQL + :members: + :special-members: __init__, __call__ \ No newline at end of file diff --git a/docs/guides/index.md b/docs/guides/index.md index 60a94e8d5..f754908a2 100644 --- a/docs/guides/index.md +++ b/docs/guides/index.md @@ -10,4 +10,5 @@ pagination-classes custom-clis config-schema sql-tap +sql-target ``` diff --git a/docs/guides/sql-target.md b/docs/guides/sql-target.md new file mode 100644 index 000000000..7a9930a2a --- /dev/null +++ b/docs/guides/sql-target.md @@ -0,0 +1,52 @@ +# Building SQL targets + +## Mapping JSON Schema to SQL types + +Starting with version `0.42.0`, the Meltano Singer SDK provides a clean way to map JSON Schema to SQL types. This is useful when the SQL dialect needs to do special handling for certain JSON Schema types. + +### Custom JSON Schema mapping + +If the default [`JSONSchemaToSQL`](connectors.sql.JSONSchemaToSQL) instance doesn't cover all the types supported by the SQLAlchemy dialect in your target, you can override the {attr}`SQLConnector.jsonschema_to_sql ` property and register a new type handler for the type you need to support: + +```python +import functools + +import sqlalchemy as sa +from singer_sdk import typing as th +from singer_sdk.connectors import JSONSchemaToSQL, SQLConnector + +from my_sqlalchemy_dialect import VectorType + + +def custom_array_to_sql(jsonschema: dict) -> VectorType | sa.types.VARCHAR: + """Custom mapping for arrays of numbers.""" + if items := jsonschema.get("items"): + if items.get("type") == "number": + return VectorType() + + return sa.types.VARCHAR() + + +class MyConnector(SQLConnector): + @functools.cached_property + def jsonschema_to_sql(self): + to_sql = JSONSchemaToSQL() + to_sql.register_type_handler("array", custom_array_to_sql) + return to_sql +``` + +### Custom string format mapping + +You can also register a new format handler for custom string formats: + +```python +from my_sqlalchemy_dialect import URI + + +class MyConnector(SQLConnector): + @functools.cached_property + def jsonschema_to_sql(self): + to_sql = JSONSchemaToSQL() + to_sql.register_format_handler("uri", URI) + return to_sql +``` diff --git a/docs/reference.rst b/docs/reference.rst index eeaf1b53a..71e0d6ddb 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -152,3 +152,4 @@ Other :template: class.rst connectors.sql.SQLToJSONSchema + connectors.sql.JSONSchemaToSQL diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index 98e6ea7d6..d3c8ce248 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -25,6 +25,11 @@ else: from warnings import deprecated +if sys.version_info < (3, 10): + from typing_extensions import TypeAlias +else: + from typing import TypeAlias # noqa: ICN003 + if t.TYPE_CHECKING: from sqlalchemy.engine import Engine from sqlalchemy.engine.reflection import Inspector @@ -192,6 +197,227 @@ def boolean_to_jsonschema(self, column_type: sa.types.Boolean) -> dict: # noqa: return th.BooleanType.type_dict # type: ignore[no-any-return] +JSONtoSQLHandler: TypeAlias = t.Union[ + t.Type[sa.types.TypeEngine], + t.Callable[[dict], sa.types.TypeEngine], +] + + +class JSONSchemaToSQL: + """A configurable mapper for converting JSON Schema types to SQLAlchemy types. + + This class provides a mapping from JSON Schema types to SQLAlchemy types. + + .. versionadded:: 0.42.0 + """ + + def __init__(self) -> None: + """Initialize the mapper with default type mappings.""" + # Default type mappings + self._type_mapping: dict[str, JSONtoSQLHandler] = { + "string": self._handle_string_type, + "integer": sa.types.INTEGER, + "number": sa.types.DECIMAL, + "boolean": sa.types.BOOLEAN, + "object": sa.types.VARCHAR, + "array": sa.types.VARCHAR, + } + + # Format handlers for string types + self._format_handlers: dict[str, JSONtoSQLHandler] = { + # Default date-like formats + "date-time": sa.types.DATETIME, + "time": sa.types.TIME, + "date": sa.types.DATE, + # Common string formats with sensible defaults + "uuid": sa.types.UUID, + "email": lambda _: sa.types.VARCHAR(254), # RFC 5321 + "uri": lambda _: sa.types.VARCHAR(2083), # Common browser limit + "hostname": lambda _: sa.types.VARCHAR(253), # RFC 1035 + "ipv4": lambda _: sa.types.VARCHAR(15), + "ipv6": lambda _: sa.types.VARCHAR(45), + } + + self._fallback_type: type[sa.types.TypeEngine] = sa.types.VARCHAR + + def _invoke_handler( # noqa: PLR6301 + self, + handler: JSONtoSQLHandler, + schema: dict, + ) -> sa.types.TypeEngine: + """Invoke a handler, handling both type classes and callables. + + Args: + handler: The handler to invoke. + schema: The schema to pass to callable handlers. + + Returns: + The resulting SQLAlchemy type. + """ + if isinstance(handler, type): + return handler() # type: ignore[no-any-return] + return handler(schema) + + @property + def fallback_type(self) -> type[sa.types.TypeEngine]: + """Return the fallback type. + + Returns: + The fallback type. + """ + return self._fallback_type + + @fallback_type.setter + def fallback_type(self, value: type[sa.types.TypeEngine]) -> None: + """Set the fallback type. + + Args: + value: The new fallback type. + """ + self._fallback_type = value + + def register_type_handler(self, json_type: str, handler: JSONtoSQLHandler) -> None: + """Register a custom type handler. + + Args: + json_type: The JSON Schema type to handle. + handler: Either a SQLAlchemy type class or a callable that takes a schema + dict and returns a SQLAlchemy type instance. + """ + self._type_mapping[json_type] = handler + + def register_format_handler( + self, + format_name: str, + handler: JSONtoSQLHandler, + ) -> None: + """Register a custom format handler. + + Args: + format_name: The format string (e.g., "date-time", "email", "custom-format"). + handler: Either a SQLAlchemy type class or a callable that takes a schema + dict and returns a SQLAlchemy type instance. + """ # noqa: E501 + self._format_handlers[format_name] = handler + + def handle_multiple_types(self, types: t.Sequence[str]) -> sa.types.TypeEngine: # noqa: ARG002, PLR6301 + """Handle multiple types by returning a VARCHAR. + + Args: + types: The list of types to handle. + + Returns: + A VARCHAR type. + """ + return sa.types.VARCHAR() + + def handle_raw_string(self, schema: dict) -> sa.types.TypeEngine: # noqa: PLR6301 + """Handle a string type generically. + + Args: + schema: The JSON Schema object. + + Returns: + Appropriate SQLAlchemy type. + """ + max_length: int | None = schema.get("maxLength") + return sa.types.VARCHAR(max_length) + + def _get_type_from_schema(self, schema: dict) -> sa.types.TypeEngine | None: + """Try to get a SQL type from a single schema object. + + Args: + schema: The JSON Schema object. + + Returns: + SQL type if one can be determined, None otherwise. + """ + # Check if this is a string with format first + if schema.get("type") == "string" and "format" in schema: + format_type = self._handle_format(schema) + if format_type is not None: + return format_type + + # Then check regular types + if schema_type := schema.get("type"): + if isinstance(schema_type, (list, tuple)): + # Filter out null type if present + non_null_types = [t for t in schema_type if t != "null"] + + # If we have multiple non-null types, use VARCHAR + if len(non_null_types) > 1: + self.handle_multiple_types(non_null_types) + + # If we have exactly one non-null type, use its handler + if len(non_null_types) == 1 and non_null_types[0] in self._type_mapping: + handler = self._type_mapping[non_null_types[0]] + return self._invoke_handler(handler, schema) + + elif type_handler := self._type_mapping.get(schema_type): + return self._invoke_handler(type_handler, schema) + + return None + + def _handle_format(self, schema: dict) -> sa.types.TypeEngine | None: + """Handle format-specific type conversion. + + Args: + schema: The JSON Schema object. + + Returns: + The format-specific SQL type if applicable, None otherwise. + """ + if "format" not in schema: + return None + + format_string: str = schema["format"] + + if handler := self._format_handlers.get(format_string): + return self._invoke_handler(handler, schema) + + return None + + def _handle_string_type(self, schema: dict) -> sa.types.TypeEngine: + """Handle string type conversion with special cases for formats. + + Args: + schema: The JSON Schema object. + + Returns: + Appropriate SQLAlchemy type. + """ + # Check for format-specific handling first + if format_type := self._handle_format(schema): + return format_type + + return self.handle_raw_string(schema) + + def to_sql_type(self, schema: dict) -> sa.types.TypeEngine: + """Convert a JSON Schema type definition to a SQLAlchemy type. + + Args: + schema: The JSON Schema object. + + Returns: + The corresponding SQLAlchemy type. + """ + if sql_type := self._get_type_from_schema(schema): + return sql_type + + # Handle anyOf + if "anyOf" in schema: + for subschema in schema["anyOf"]: + # Skip null types in anyOf + if subschema.get("type") == "null": + continue + + if sql_type := self._get_type_from_schema(subschema): + return sql_type + + # Fallback + return self.fallback_type() + + class SQLConnector: # noqa: PLR0904 """Base class for SQLAlchemy-based connectors. @@ -255,6 +481,16 @@ def sql_to_jsonschema(self) -> SQLToJSONSchema: """ return SQLToJSONSchema() + @functools.cached_property + def jsonschema_to_sql(self) -> JSONSchemaToSQL: + """The JSON-to-SQL type mapper object for this SQL connector. + + Override this property to provide a custom mapping for your SQL dialect. + + .. versionadded:: 0.42.0 + """ + return JSONSchemaToSQL() + @contextmanager def _connect(self) -> t.Iterator[sa.engine.Connection]: with self._engine.connect().execution_options(stream_results=True) as conn: @@ -418,8 +654,7 @@ def to_jsonschema_type( msg = f"Unexpected type received: '{type(sql_type).__name__}'" raise ValueError(msg) - @staticmethod - def to_sql_type(jsonschema_type: dict) -> sa.types.TypeEngine: + def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine: """Return a JSON Schema representation of the provided type. By default will call `typing.to_sql_type()`. @@ -435,7 +670,7 @@ def to_sql_type(jsonschema_type: dict) -> sa.types.TypeEngine: Returns: The SQLAlchemy type representation of the data type. """ - return th.to_sql_type(jsonschema_type) + return self.jsonschema_to_sql.to_sql_type(jsonschema_type) @staticmethod def get_fully_qualified_name( diff --git a/singer_sdk/typing.py b/singer_sdk/typing.py index 5ee302948..fb22f4e82 100644 --- a/singer_sdk/typing.py +++ b/singer_sdk/typing.py @@ -1216,6 +1216,10 @@ def _jsonschema_type_check(jsonschema_type: dict, type_check: tuple[str]) -> boo ) +@deprecated( + "Use `JSONSchemaToSQL` instead.", + category=DeprecationWarning, +) def to_sql_type( # noqa: PLR0911, C901 jsonschema_type: dict, ) -> sa.types.TypeEngine: diff --git a/tests/core/test_connector_sql.py b/tests/core/test_connector_sql.py index 94bce926e..66637e9da 100644 --- a/tests/core/test_connector_sql.py +++ b/tests/core/test_connector_sql.py @@ -11,7 +11,11 @@ from samples.sample_duckdb import DuckDBConnector from singer_sdk.connectors import SQLConnector -from singer_sdk.connectors.sql import FullyQualifiedName, SQLToJSONSchema +from singer_sdk.connectors.sql import ( + FullyQualifiedName, + JSONSchemaToSQL, + SQLToJSONSchema, +) from singer_sdk.exceptions import ConfigValidationError if t.TYPE_CHECKING: @@ -445,7 +449,7 @@ def test_sql_to_json_schema_map( assert m.to_jsonschema(sql_type) == expected_jsonschema_type -def test_custom_type(): +def test_custom_type_to_jsonschema(): class MyMap(SQLToJSONSchema): @SQLToJSONSchema.to_jsonschema.register def custom_number_to_jsonschema(self, column_type: sa.types.NUMERIC) -> dict: @@ -470,3 +474,195 @@ def my_type_to_jsonschema(self, column_type) -> dict: # noqa: ARG002 "multipleOf": 0.01, } assert m.to_jsonschema(sa.types.BOOLEAN()) == {"type": ["boolean"]} + + +class TestJSONSchemaToSQL: # noqa: PLR0904 + @pytest.fixture + def json_schema_to_sql(self) -> JSONSchemaToSQL: + return JSONSchemaToSQL() + + def test_register_jsonschema_type_handler( + self, + json_schema_to_sql: JSONSchemaToSQL, + ): + json_schema_to_sql.register_type_handler("my-type", sa.types.LargeBinary) + result = json_schema_to_sql.to_sql_type({"type": "my-type"}) + assert isinstance(result, sa.types.LargeBinary) + + def test_register_jsonschema_format_handler( + self, + json_schema_to_sql: JSONSchemaToSQL, + ): + json_schema_to_sql.register_format_handler("my-format", sa.types.LargeBinary) + result = json_schema_to_sql.to_sql_type( + { + "type": "string", + "format": "my-format", + } + ) + assert isinstance(result, sa.types.LargeBinary) + + def test_string(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"type": ["string", "null"]} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.VARCHAR) + assert result.length is None + + def test_string_max_length(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"type": ["string", "null"], "maxLength": 10} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance( + json_schema_to_sql.to_sql_type(jsonschema_type), + sa.types.VARCHAR, + ) + assert result.length == 10 + + def test_integer(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"type": ["integer", "null"]} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.INTEGER) + + def test_number(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"type": ["number", "null"]} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.DECIMAL) + + def test_boolean(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"type": ["boolean", "null"]} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.BOOLEAN) + + def test_object(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"type": "object", "properties": {}} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.VARCHAR) + + def test_array(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"type": "array"} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.VARCHAR) + + def test_array_items(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"type": "array", "items": {"type": "string"}} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.VARCHAR) + + def test_date(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"format": "date", "type": ["string", "null"]} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.DATE) + + def test_time(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"format": "time", "type": ["string", "null"]} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.TIME) + + def test_uuid(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"format": "uuid", "type": ["string", "null"]} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.UUID) + + def test_datetime(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"format": "date-time", "type": ["string", "null"]} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.DATETIME) + + def test_anyof_datetime(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = { + "anyOf": [ + {"type": "string", "format": "date-time"}, + {"type": "null"}, + ], + } + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.DATETIME) + + def test_anyof_integer(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = { + "anyOf": [ + {"type": "null"}, + {"type": "integer"}, + ], + } + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.INTEGER) + + def test_anyof_unknown(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = { + "anyOf": [ + {"type": "null"}, + {"type": "unknown"}, + ], + } + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.VARCHAR) + + @pytest.mark.parametrize( + "jsonschema_type,expected_type", + [ + pytest.param( + {"type": ["array", "object", "boolean", "null"]}, + sa.types.VARCHAR, + id="array-first", + ), + pytest.param( + {"type": ["boolean", "array", "object", "null"]}, + sa.types.VARCHAR, + id="boolean-first", + ), + ], + ) + def test_complex( + self, + json_schema_to_sql: JSONSchemaToSQL, + jsonschema_type: dict, + expected_type: type[sa.types.TypeEngine], + ): + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, expected_type) + + def test_unknown_type(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"cannot": "compute"} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.VARCHAR) + + def test_unknown_format(self, json_schema_to_sql: JSONSchemaToSQL): + jsonschema_type = {"type": "string", "format": "unknown"} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.VARCHAR) + + def test_custom_fallback(self): + json_schema_to_sql = JSONSchemaToSQL() + json_schema_to_sql.fallback_type = sa.types.CHAR + jsonschema_type = {"cannot": "compute"} + result = json_schema_to_sql.to_sql_type(jsonschema_type) + assert isinstance(result, sa.types.CHAR) + + def test_custom_handle_raw_string(self): + class CustomJSONSchemaToSQL(JSONSchemaToSQL): + def handle_raw_string(self, schema): + if schema.get("contentMediaType") == "image/png": + return sa.types.LargeBinary() + + return super().handle_raw_string(schema) + + json_schema_to_sql = CustomJSONSchemaToSQL() + + vanilla = {"type": ["string"]} + result = json_schema_to_sql.to_sql_type(vanilla) + assert isinstance(result, sa.types.VARCHAR) + + non_image_type = { + "type": "string", + "contentMediaType": "text/html", + } + result = json_schema_to_sql.to_sql_type(non_image_type) + assert isinstance(result, sa.types.VARCHAR) + + image_type = { + "type": "string", + "contentEncoding": "base64", + "contentMediaType": "image/png", + } + result = json_schema_to_sql.to_sql_type(image_type) + assert isinstance(result, sa.types.LargeBinary) diff --git a/tests/core/test_sql_typing.py b/tests/core/test_sql_typing.py index 5662d7d0d..8b6ff1a96 100644 --- a/tests/core/test_sql_typing.py +++ b/tests/core/test_sql_typing.py @@ -49,7 +49,8 @@ def test_convert_jsonschema_type_to_sql_type( jsonschema_type: dict, sql_type: sa.types.TypeEngine, ): - result = th.to_sql_type(jsonschema_type) + with pytest.warns(DeprecationWarning): + result = th.to_sql_type(jsonschema_type) assert isinstance(result, sql_type.__class__) assert str(result) == str(sql_type) diff --git a/tests/core/test_typing.py b/tests/core/test_typing.py index 219c9a587..c6919a81d 100644 --- a/tests/core/test_typing.py +++ b/tests/core/test_typing.py @@ -325,6 +325,7 @@ def test_conform_primitives(): assert _conform_primitive_property(1, {"type": ["boolean"]}) is True +@pytest.mark.filterwarnings("ignore:Use `JSONSchemaToSQL` instead.:DeprecationWarning") @pytest.mark.parametrize( "jsonschema_type,expected", [