Skip to content

Commit

Permalink
refactor: Use SQLConnector.jsonschema_to_sql to map JSON schema typ…
Browse files Browse the repository at this point in the history
…es to SQL data types (MeltanoLabs#469)

Related:

- meltano/sdk#2732
  • Loading branch information
edgarrmondragon authored and nickjoanis committed Dec 4, 2024
1 parent 7afa4d9 commit c115843
Showing 1 changed file with 64 additions and 45 deletions.
109 changes: 64 additions & 45 deletions target_postgres/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import simplejson
import sqlalchemy as sa
from singer_sdk import SQLConnector
from singer_sdk import typing as th
from singer_sdk.connectors.sql import JSONSchemaToSQL
from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, BYTEA, JSONB, UUID
from sqlalchemy.engine import URL
from sqlalchemy.engine.url import make_url
Expand All @@ -30,7 +30,6 @@
TEXT,
TIME,
TIMESTAMP,
VARCHAR,
TypeDecorator,
)
from sshtunnel import SSHTunnelForwarder
Expand All @@ -39,6 +38,22 @@
from singer_sdk.connectors.sql import FullyQualifiedName


class JSONSchemaToPostgres(JSONSchemaToSQL):
"""Convert JSON Schema types to Postgres types."""

def __init__(self, *, content_encoding: bool = True) -> None:
"""Initialize the JSONSchemaToPostgres instance."""
super().__init__()
self.content_encoding = content_encoding

def handle_raw_string(self, schema):
"""Handle a raw string type."""
if self.content_encoding and schema.get("contentEncoding") == "base16":
return HexByteString()

return TEXT()


class PostgresConnector(SQLConnector):
"""Sets up SQL Alchemy, and other Postgres related stuff."""

Expand Down Expand Up @@ -223,7 +238,50 @@ def clone_table(
new_table.create(bind=connection)
return new_table

def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine: # type: ignore[override]
def _handle_array_type(self, jsonschema: dict) -> ARRAY | JSONB:
"""Handle array type."""
items = jsonschema.get("items")
# Case 1: items is a string
if isinstance(items, str):
return ARRAY(self.to_sql_type({"type": items}))

# Case 2: items are more complex
if isinstance(items, dict):
# Case 2.1: items are variants
if "type" not in items:
return ARRAY(JSONB())

items_type = items["type"]

# Case 2.2: items are a single type
if isinstance(items_type, str):
return ARRAY(self.to_sql_type({"type": items_type}))

# Case 2.3: items are a list of types
if isinstance(items_type, list):
return ARRAY(self.to_sql_type({"type": items_type}))

# Case 3: tuples
return ARRAY(JSONB()) if isinstance(items, list) else JSONB()

@cached_property
def jsonschema_to_sql(self) -> JSONSchemaToSQL:
"""Return a JSONSchemaToSQL instance with custom type handling."""
to_sql = JSONSchemaToPostgres(content_encoding=self.interpret_content_encoding)
to_sql.fallback_type = TEXT
to_sql.register_type_handler("integer", BIGINT)
to_sql.register_type_handler("object", JSONB)
to_sql.register_type_handler("array", self._handle_array_type)
to_sql.register_format_handler("date-time", TIMESTAMP)
to_sql.register_format_handler("uuid", UUID)
to_sql.register_format_handler("email", TEXT)
to_sql.register_format_handler("uri", TEXT)
to_sql.register_format_handler("hostname", TEXT)
to_sql.register_format_handler("ipv4", TEXT)
to_sql.register_format_handler("ipv6", TEXT)
return to_sql

def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine:
"""Return a JSON Schema representation of the provided type.
By default will call `typing.to_sql_type()`.
Expand Down Expand Up @@ -279,7 +337,7 @@ def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine: # type: ig

return PostgresConnector.pick_best_sql_type(sql_type_array=sql_type_array)

def pick_individual_type(self, jsonschema_type: dict): # noqa: PLR0911
def pick_individual_type(self, jsonschema_type: dict):
"""Select the correct sql type assuming jsonschema_type has only a single type.
Args:
Expand All @@ -290,47 +348,8 @@ def pick_individual_type(self, jsonschema_type: dict): # noqa: PLR0911
"""
if "null" in jsonschema_type["type"]:
return None
if "integer" in jsonschema_type["type"]:
return BIGINT()
if "object" in jsonschema_type["type"]:
return JSONB()
if "array" in jsonschema_type["type"]:
items = jsonschema_type.get("items")
# Case 1: items is a string
if isinstance(items, str):
return ARRAY(self.to_sql_type({"type": items}))

# Case 2: items are more complex
if isinstance(items, dict):
# Case 2.1: items are variants
if "type" not in items:
return ARRAY(JSONB())

items_type = items["type"]

# Case 2.2: items are a single type
if isinstance(items_type, str):
return ARRAY(self.to_sql_type({"type": items_type}))

# Case 2.3: items are a list of types
if isinstance(items_type, list):
return ARRAY(self.to_sql_type({"type": items_type}))

# Case 3: tuples
return ARRAY(JSONB()) if isinstance(items, list) else JSONB()

# string formats
if jsonschema_type.get("format") == "date-time":
return TIMESTAMP()
if jsonschema_type.get("format") == "uuid":
return UUID()
if (
self.interpret_content_encoding
and jsonschema_type.get("contentEncoding") == "base16"
):
return HexByteString()
individual_type = th.to_sql_type(jsonschema_type)
return TEXT() if isinstance(individual_type, VARCHAR) else individual_type

return self.jsonschema_to_sql.to_sql_type(jsonschema_type)

@staticmethod
def pick_best_sql_type(sql_type_array: list):
Expand Down

0 comments on commit c115843

Please sign in to comment.