Skip to content

Commit

Permalink
feat(targets): SQL target developers can now more easily override the…
Browse files Browse the repository at this point in the history
… mapping from JSON schema to SQL column type (#2732)

* feat(target): SQL target developers can now more easily override the mapping from JSON schema to SQL column type

* Add failing test for complex type case

* Correctly handle multiple types

* Make mypy happy

* Test more cases

* Add docs

* Fix annotation in docs

* Support custom fallbacks

* Add a way to customize raw string handling
  • Loading branch information
edgarrmondragon committed Nov 11, 2024
1 parent 2510d9e commit 3bae0ff
Show file tree
Hide file tree
Showing 9 changed files with 505 additions and 6 deletions.
8 changes: 8 additions & 0 deletions docs/classes/singer_sdk.connectors.sql.JSONSchemaToSQL.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
singer_sdk.connectors.sql.JSONSchemaToSQL
=========================================

.. currentmodule:: singer_sdk.connectors.sql

.. autoclass:: JSONSchemaToSQL
:members:
:special-members: __init__, __call__
1 change: 1 addition & 0 deletions docs/guides/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ pagination-classes
custom-clis
config-schema
sql-tap
sql-target
```
52 changes: 52 additions & 0 deletions docs/guides/sql-target.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Building SQL targets

## Mapping JSON Schema to SQL types

Starting with version `0.42.0`, the Meltano Singer SDK provides a clean way to map JSON Schema to SQL types. This is useful when the SQL dialect needs to do special handling for certain JSON Schema types.

### Custom JSON Schema mapping

If the default [`JSONSchemaToSQL`](connectors.sql.JSONSchemaToSQL) instance doesn't cover all the types supported by the SQLAlchemy dialect in your target, you can override the {attr}`SQLConnector.jsonschema_to_sql <singer_sdk.SQLConnector.jsonschema_to_sql>` property and register a new type handler for the type you need to support:

```python
import functools

import sqlalchemy as sa
from singer_sdk import typing as th
from singer_sdk.connectors import JSONSchemaToSQL, SQLConnector

from my_sqlalchemy_dialect import VectorType


def custom_array_to_sql(jsonschema: dict) -> VectorType | sa.types.VARCHAR:
"""Custom mapping for arrays of numbers."""
if items := jsonschema.get("items"):
if items.get("type") == "number":
return VectorType()

return sa.types.VARCHAR()


class MyConnector(SQLConnector):
@functools.cached_property
def jsonschema_to_sql(self):
to_sql = JSONSchemaToSQL()
to_sql.register_type_handler("array", custom_array_to_sql)
return to_sql
```

### Custom string format mapping

You can also register a new format handler for custom string formats:

```python
from my_sqlalchemy_dialect import URI


class MyConnector(SQLConnector):
@functools.cached_property
def jsonschema_to_sql(self):
to_sql = JSONSchemaToSQL()
to_sql.register_format_handler("uri", URI)
return to_sql
```
1 change: 1 addition & 0 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,4 @@ Other
:template: class.rst

connectors.sql.SQLToJSONSchema
connectors.sql.JSONSchemaToSQL
241 changes: 238 additions & 3 deletions singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@
else:
from warnings import deprecated

if sys.version_info < (3, 10):
from typing_extensions import TypeAlias
else:
from typing import TypeAlias # noqa: ICN003

if t.TYPE_CHECKING:
from sqlalchemy.engine import Engine
from sqlalchemy.engine.reflection import Inspector
Expand Down Expand Up @@ -192,6 +197,227 @@ def boolean_to_jsonschema(self, column_type: sa.types.Boolean) -> dict: # noqa:
return th.BooleanType.type_dict # type: ignore[no-any-return]


JSONtoSQLHandler: TypeAlias = t.Union[
t.Type[sa.types.TypeEngine],
t.Callable[[dict], sa.types.TypeEngine],
]


class JSONSchemaToSQL:
"""A configurable mapper for converting JSON Schema types to SQLAlchemy types.
This class provides a mapping from JSON Schema types to SQLAlchemy types.
.. versionadded:: 0.42.0
"""

def __init__(self) -> None:
"""Initialize the mapper with default type mappings."""
# Default type mappings
self._type_mapping: dict[str, JSONtoSQLHandler] = {
"string": self._handle_string_type,
"integer": sa.types.INTEGER,
"number": sa.types.DECIMAL,
"boolean": sa.types.BOOLEAN,
"object": sa.types.VARCHAR,
"array": sa.types.VARCHAR,
}

# Format handlers for string types
self._format_handlers: dict[str, JSONtoSQLHandler] = {
# Default date-like formats
"date-time": sa.types.DATETIME,
"time": sa.types.TIME,
"date": sa.types.DATE,
# Common string formats with sensible defaults
"uuid": sa.types.UUID,
"email": lambda _: sa.types.VARCHAR(254), # RFC 5321
"uri": lambda _: sa.types.VARCHAR(2083), # Common browser limit
"hostname": lambda _: sa.types.VARCHAR(253), # RFC 1035
"ipv4": lambda _: sa.types.VARCHAR(15),
"ipv6": lambda _: sa.types.VARCHAR(45),
}

self._fallback_type: type[sa.types.TypeEngine] = sa.types.VARCHAR

def _invoke_handler( # noqa: PLR6301
self,
handler: JSONtoSQLHandler,
schema: dict,
) -> sa.types.TypeEngine:
"""Invoke a handler, handling both type classes and callables.
Args:
handler: The handler to invoke.
schema: The schema to pass to callable handlers.
Returns:
The resulting SQLAlchemy type.
"""
if isinstance(handler, type):
return handler() # type: ignore[no-any-return]
return handler(schema)

@property
def fallback_type(self) -> type[sa.types.TypeEngine]:
"""Return the fallback type.
Returns:
The fallback type.
"""
return self._fallback_type

@fallback_type.setter
def fallback_type(self, value: type[sa.types.TypeEngine]) -> None:
"""Set the fallback type.
Args:
value: The new fallback type.
"""
self._fallback_type = value

def register_type_handler(self, json_type: str, handler: JSONtoSQLHandler) -> None:
"""Register a custom type handler.
Args:
json_type: The JSON Schema type to handle.
handler: Either a SQLAlchemy type class or a callable that takes a schema
dict and returns a SQLAlchemy type instance.
"""
self._type_mapping[json_type] = handler

def register_format_handler(
self,
format_name: str,
handler: JSONtoSQLHandler,
) -> None:
"""Register a custom format handler.
Args:
format_name: The format string (e.g., "date-time", "email", "custom-format").
handler: Either a SQLAlchemy type class or a callable that takes a schema
dict and returns a SQLAlchemy type instance.
""" # noqa: E501
self._format_handlers[format_name] = handler

def handle_multiple_types(self, types: t.Sequence[str]) -> sa.types.TypeEngine: # noqa: ARG002, PLR6301
"""Handle multiple types by returning a VARCHAR.
Args:
types: The list of types to handle.
Returns:
A VARCHAR type.
"""
return sa.types.VARCHAR()

def handle_raw_string(self, schema: dict) -> sa.types.TypeEngine: # noqa: PLR6301
"""Handle a string type generically.
Args:
schema: The JSON Schema object.
Returns:
Appropriate SQLAlchemy type.
"""
max_length: int | None = schema.get("maxLength")
return sa.types.VARCHAR(max_length)

def _get_type_from_schema(self, schema: dict) -> sa.types.TypeEngine | None:
"""Try to get a SQL type from a single schema object.
Args:
schema: The JSON Schema object.
Returns:
SQL type if one can be determined, None otherwise.
"""
# Check if this is a string with format first
if schema.get("type") == "string" and "format" in schema:
format_type = self._handle_format(schema)
if format_type is not None:
return format_type

# Then check regular types
if schema_type := schema.get("type"):
if isinstance(schema_type, (list, tuple)):
# Filter out null type if present
non_null_types = [t for t in schema_type if t != "null"]

# If we have multiple non-null types, use VARCHAR
if len(non_null_types) > 1:
self.handle_multiple_types(non_null_types)

# If we have exactly one non-null type, use its handler
if len(non_null_types) == 1 and non_null_types[0] in self._type_mapping:
handler = self._type_mapping[non_null_types[0]]
return self._invoke_handler(handler, schema)

elif type_handler := self._type_mapping.get(schema_type):
return self._invoke_handler(type_handler, schema)

return None

def _handle_format(self, schema: dict) -> sa.types.TypeEngine | None:
"""Handle format-specific type conversion.
Args:
schema: The JSON Schema object.
Returns:
The format-specific SQL type if applicable, None otherwise.
"""
if "format" not in schema:
return None

format_string: str = schema["format"]

if handler := self._format_handlers.get(format_string):
return self._invoke_handler(handler, schema)

return None

def _handle_string_type(self, schema: dict) -> sa.types.TypeEngine:
"""Handle string type conversion with special cases for formats.
Args:
schema: The JSON Schema object.
Returns:
Appropriate SQLAlchemy type.
"""
# Check for format-specific handling first
if format_type := self._handle_format(schema):
return format_type

return self.handle_raw_string(schema)

def to_sql_type(self, schema: dict) -> sa.types.TypeEngine:
"""Convert a JSON Schema type definition to a SQLAlchemy type.
Args:
schema: The JSON Schema object.
Returns:
The corresponding SQLAlchemy type.
"""
if sql_type := self._get_type_from_schema(schema):
return sql_type

# Handle anyOf
if "anyOf" in schema:
for subschema in schema["anyOf"]:
# Skip null types in anyOf
if subschema.get("type") == "null":
continue

if sql_type := self._get_type_from_schema(subschema):
return sql_type

# Fallback
return self.fallback_type()


class SQLConnector: # noqa: PLR0904
"""Base class for SQLAlchemy-based connectors.
Expand Down Expand Up @@ -255,6 +481,16 @@ def sql_to_jsonschema(self) -> SQLToJSONSchema:
"""
return SQLToJSONSchema()

@functools.cached_property
def jsonschema_to_sql(self) -> JSONSchemaToSQL:
"""The JSON-to-SQL type mapper object for this SQL connector.
Override this property to provide a custom mapping for your SQL dialect.
.. versionadded:: 0.42.0
"""
return JSONSchemaToSQL()

@contextmanager
def _connect(self) -> t.Iterator[sa.engine.Connection]:
with self._engine.connect().execution_options(stream_results=True) as conn:
Expand Down Expand Up @@ -418,8 +654,7 @@ def to_jsonschema_type(
msg = f"Unexpected type received: '{type(sql_type).__name__}'"
raise ValueError(msg)

@staticmethod
def to_sql_type(jsonschema_type: dict) -> sa.types.TypeEngine:
def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine:
"""Return a JSON Schema representation of the provided type.
By default will call `typing.to_sql_type()`.
Expand All @@ -435,7 +670,7 @@ def to_sql_type(jsonschema_type: dict) -> sa.types.TypeEngine:
Returns:
The SQLAlchemy type representation of the data type.
"""
return th.to_sql_type(jsonschema_type)
return self.jsonschema_to_sql.to_sql_type(jsonschema_type)

@staticmethod
def get_fully_qualified_name(
Expand Down
4 changes: 4 additions & 0 deletions singer_sdk/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1216,6 +1216,10 @@ def _jsonschema_type_check(jsonschema_type: dict, type_check: tuple[str]) -> boo
)


@deprecated(
"Use `JSONSchemaToSQL` instead.",
category=DeprecationWarning,
)
def to_sql_type( # noqa: PLR0911, C901
jsonschema_type: dict,
) -> sa.types.TypeEngine:
Expand Down
Loading

0 comments on commit 3bae0ff

Please sign in to comment.