Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add configuration to store enums as pg custom domain types #292

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,41 @@ Example data:
```

For convenience, data prefixed with `0x` or containing an odd number of characters is supported although it's not part of the standard.

## String enum support

This target can store columns with the json-schema "enum" type as a pg DOMAIN type instead of a TEXT column. This can save space and improve performance.
prevostc marked this conversation as resolved.
Show resolved Hide resolved

It is an opt-in feature because it might result in data loss if the actual data does not match the schema's advertised encoding.

Please consider these several downsides to take into consideration before activating this feature:
- it changes the sort behavior of the resulting column
- string operations will not be available
- portability of the data is reduced
- it is not possible to add remove or modify the enum values
prevostc marked this conversation as resolved.
Show resolved Hide resolved
- enums are not shared accross tables, each column get his own custom type

To enable it, set the `storage_optimized_enum` option to `True`.

Example schema:
```json
{
"type": "object",
"properties": {
"my_enum": {
"type": "string",
"enum": ["foo", "bar", "baz"]
}
}
}
```

Data will be stored as a custom domain type in the database. The domain name will be `enum_<unique_hash>_<column_name>`. The domain will be created in the same schema as the table.

Example generated SQL:
```sql
CREATE TYPE enum_123456_my_enum AS ENUM ('foo', 'bar', 'baz');
CREATE TABLE my_table (
my_enum enum_123456_my_enum
);
```
156 changes: 124 additions & 32 deletions target_postgres/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import atexit
import hashlib
import io
import signal
import typing as t
Expand All @@ -16,7 +17,7 @@
import sqlalchemy as sa
from singer_sdk import SQLConnector
from singer_sdk import typing as th
from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, BYTEA, JSONB
from sqlalchemy.dialects.postgresql import ARRAY, BIGINT, BYTEA, ENUM, JSONB
from sqlalchemy.engine import URL
from sqlalchemy.engine.url import make_url
from sqlalchemy.types import (
Expand Down Expand Up @@ -92,6 +93,29 @@ def interpret_content_encoding(self) -> bool:
"""
return self.config.get("interpret_content_encoding", False)

@cached_property
def storage_optimized_enum(self) -> bool:
"""Whether to use storage optimized enum.

Stores columns with the json-schema "enum" type as a pg DOMAIN type
instead of a TEXT column. This can save space and improve performance.

It is an opt-in feature because it might result in data loss if the
actual data does not match the schema's advertised encoding.

Please consider these several downsides to take into consideration
before activating:
- it changes the sort behavior of the resulting column
- string operations will not be available
- portability of the data is reduced
- it is not possible to add remove or modify the enum values
- enums are not shared accross tables, each column get his own custom type

Returns:
True if the feature is enabled, False otherwise.
"""
return self.config.get("storage_optimized_enum", False)

def prepare_table( # type: ignore[override]
self,
full_table_name: str,
Expand Down Expand Up @@ -146,7 +170,12 @@ def prepare_table( # type: ignore[override]
self.prepare_column(
full_table_name=table.fullname,
column_name=property_name,
sql_type=self.to_sql_type(property_def),
sql_type=self.to_sql_type(
schema_name=schema_name,
table_name=table_name,
property_name=property_name,
jsonschema_type=property_def,
),
connection=connection,
column_object=column_object,
)
Expand All @@ -171,22 +200,38 @@ def copy_table_structure(
Returns:
The new table object.
"""
_, schema_name, table_name = self.parse_full_table_name(full_table_name)
meta = sa.MetaData(schema=schema_name)
new_table: sa.Table
columns = []
_, to_table_schema_name, to_table_name = self.parse_full_table_name(
full_table_name
)
if self.table_exists(full_table_name=full_table_name):
raise RuntimeError("Table already exists")
for column in from_table.columns:
columns.append(column._copy())
if as_temp_table:
new_table = sa.Table(table_name, meta, *columns, prefixes=["TEMPORARY"])
new_table.create(bind=connection)
return new_table
else:
new_table = sa.Table(table_name, meta, *columns)
new_table.create(bind=connection)
return new_table

from_table_meta = sa.MetaData(schema=from_table.schema)
to_table_meta = sa.MetaData(schema=to_table_schema_name)
columns = [
# Make sure this temporary table do not issue a CREATE TYPE
# or DROP TYPE statement for custom enums
sa.Column(
c.name,
ENUM(
*c.type.enums,
name=c.type.name,
metadata=from_table_meta,
create_type=False,
),
)
if isinstance(c.type, ENUM) and hasattr(c.type, "enums")
else c.copy()
for c in from_table.columns
]
new_table = sa.Table(
to_table_name,
to_table_meta,
*columns,
prefixes=["TEMPORARY"] if as_temp_table else [],
)
new_table.create(bind=connection, checkfirst=True)
return new_table

@contextmanager
def _connect(self) -> t.Iterator[sa.engine.Connection]:
Expand All @@ -195,7 +240,7 @@ def _connect(self) -> t.Iterator[sa.engine.Connection]:

def drop_table(self, table: sa.Table, connection: sa.engine.Connection):
"""Drop table data."""
table.drop(bind=connection)
table.drop(bind=connection, checkfirst=True)

def clone_table(
self, new_table_name, table, metadata, connection, temp_table
Expand All @@ -218,7 +263,13 @@ def clone_table(
new_table.create(bind=connection)
return new_table

def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine: # type: ignore[override]
def to_sql_type( # type: ignore[override]
self,
schema_name: str | None,
table_name: str,
property_name: str,
jsonschema_type: dict,
) -> sa.types.TypeEngine:
"""Return a JSON Schema representation of the provided type.

By default will call `typing.to_sql_type()`.
Expand All @@ -229,6 +280,9 @@ def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine: # type: ig
from the base class for all unhandled cases.

Args:
schema_name: The name of the target table schema.
table_name: The name of the target table.
property_name: The name of the property.
jsonschema_type: The JSON Schema representation of the source type.

Returns:
Expand All @@ -242,10 +296,12 @@ def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine: # type: ig
elif isinstance(jsonschema_type["type"], list):
for entry in jsonschema_type["type"]:
json_type_dict = {"type": entry}
if jsonschema_type.get("format", False):
json_type_dict["format"] = jsonschema_type["format"]
if fmt := jsonschema_type.get("format", False):
json_type_dict["format"] = fmt
if encoding := jsonschema_type.get("contentEncoding", False):
json_type_dict["contentEncoding"] = encoding
if enum := jsonschema_type.get("enum", False):
json_type_dict["enum"] = enum
json_type_array.append(json_type_dict)
else:
msg = "Invalid format for jsonschema type: not str or list."
Expand All @@ -260,16 +316,30 @@ def to_sql_type(self, jsonschema_type: dict) -> sa.types.TypeEngine: # type: ig
return NOTYPE()
sql_type_array = []
for json_type in json_type_array:
picked_type = self.pick_individual_type(jsonschema_type=json_type)
picked_type = self.pick_individual_type(
schema_name=schema_name,
table_name=table_name,
property_name=property_name,
jsonschema_type=json_type,
)
if picked_type is not None:
sql_type_array.append(picked_type)

return PostgresConnector.pick_best_sql_type(sql_type_array=sql_type_array)

def pick_individual_type(self, jsonschema_type: dict):
def pick_individual_type(
self,
schema_name: str | None,
table_name: str,
property_name: str,
jsonschema_type: dict,
):
"""Select the correct sql type assuming jsonschema_type has only a single type.

Args:
schema_name: The name of the target table schema.
table_name: The name of the target table.
property_name: The name of the property.
jsonschema_type: A jsonschema_type array containing only a single type.

Returns:
Expand All @@ -292,6 +362,20 @@ def pick_individual_type(self, jsonschema_type: dict):
and jsonschema_type.get("contentEncoding") == "base16"
):
return HexByteString()
if self.storage_optimized_enum and jsonschema_type.get("enum"):
# make sure that the enum name is unique and that the uniqueness part
# can be determined using the first 71 characters of the enum name
# this is a limitation of postgres type names
hasher = hashlib.md5(usedforsecurity=False)
hasher.update(f"{schema_name}__{table_name}__{property_name}".encode())
hash_str = hasher.hexdigest()
name_hash = hash_str[0:15]
unique_enum_name = f"enum_{name_hash}_{property_name}"

# ensure our enum is created in the correct schema
meta = sa.MetaData(schema=schema_name)
return ENUM(*jsonschema_type["enum"], name=unique_enum_name, metadata=meta)

individual_type = th.to_sql_type(jsonschema_type)
if isinstance(individual_type, VARCHAR):
return TEXT()
Expand All @@ -308,6 +392,7 @@ def pick_best_sql_type(sql_type_array: list):
An instance of the best SQL type class based on defined precedence order.
"""
precedence_order = [
ENUM,
HexByteString,
ARRAY,
JSONB,
Expand Down Expand Up @@ -372,18 +457,21 @@ def create_empty_table( # type: ignore[override]
columns.append(
sa.Column(
property_name,
self.to_sql_type(property_jsonschema),
self.to_sql_type(
schema_name=meta.schema,
table_name=table_name,
property_name=property_name,
jsonschema_type=property_jsonschema,
),
primary_key=is_primary_key,
autoincrement=False, # See: https://github.com/MeltanoLabs/target-postgres/issues/193 # noqa: E501
)
)
if as_temp_table:
new_table = sa.Table(table_name, meta, *columns, prefixes=["TEMPORARY"])
new_table.create(bind=connection)
return new_table

new_table = sa.Table(table_name, meta, *columns)
new_table.create(bind=connection)
new_table = sa.Table(
table_name, meta, *columns, prefixes=["TEMPORARY"] if as_temp_table else []
)
new_table.create(bind=connection, checkfirst=True)
return new_table

def prepare_column(
Expand Down Expand Up @@ -536,15 +624,20 @@ def _adapt_column_type( # type: ignore[override]
current_type_collation = self.remove_collation(current_type)

# Check if the existing column type and the sql type are the same
if str(sql_type) == str(current_type):
cmp_a = str(sql_type)
cmp_b = str(current_type)
if isinstance(sql_type, ENUM) and hasattr(sql_type, "enums"):
cmp_a = "ENUM " + str(sql_type.enums)
if isinstance(current_type, ENUM) and hasattr(current_type, "enums"):
cmp_b = "ENUM " + str(current_type.enums)
if cmp_a == cmp_b:
# The current column and sql type are the same
# Nothing to do
return

# Not the same type, generic type or compatible types
# calling merge_sql_types for assistnace
compatible_sql_type = self.merge_sql_types([current_type, sql_type])

if str(compatible_sql_type) == str(current_type):
# Nothing to do
return
Expand Down Expand Up @@ -794,7 +887,6 @@ def get_table_columns( # type: ignore[override]
"""
inspector = sa.inspect(connection)
columns = inspector.get_columns(table_name, schema_name)

return {
col_meta["name"]: sa.Column(
col_meta["name"],
Expand Down
8 changes: 6 additions & 2 deletions target_postgres/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ def upsert(
insert_stmt = sa.insert(to_table).from_select(
names=from_table.columns, select=select_stmt
)

connection.execute(insert_stmt)

# Update
Expand Down Expand Up @@ -254,7 +253,12 @@ def column_representation(
columns.append(
sa.Column(
property_name,
self.connector.to_sql_type(property_jsonschema),
self.connector.to_sql_type(
schema_name=self.schema_name,
table_name=self.table_name,
property_name=property_name,
jsonschema_type=property_jsonschema,
),
)
)
return columns
Expand Down
17 changes: 17 additions & 0 deletions target_postgres/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,23 @@ def __init__(
"in an error if the data is not encoded as expected."
),
),
th.Property(
"storage_optimized_enum",
th.BooleanType,
default=False,
description=(
"If set to true, the target will store enum values as a custom pg type "
"instead of a text column. This can save space and improve performance,"
" but may make the data harder to query and analyze."
"Please consider these several downsides to take into consideration "
"before activating this feature:"
" it changes the sort behavior of the resulting column,"
" string operations will not be available,"
" portability of the data is reduced,"
" it is not possible to add remove or modify the enum values,"
" enums are not shared accross tables."
),
),
th.Property(
"ssl_enable",
th.BooleanType,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{"type":"SCHEMA","stream":"test_storage_optimized_enum_off","schema":{"type":"object","properties":{"id":{"type":"string"},"chain":{"type":"string","enum":["avalanche","ethereum","optimism","polygon"]},"chain_edge_cases":{"type":"string","enum":["AvAlAnChE"," EtH ","a\u005Cb","PŒLŸflôÑ","_éèàÀÇÉ","_-.*$@#%","ù€/\\\n\r\t","'"]}},"required":["id","chain","chain_edge_cases"]},"key_properties":["id"]}
{"type":"RECORD","stream":"test_storage_optimized_enum_off","record":{"id":"test_valid_1","chain":"avalanche","chain_edge_cases":"AvAlAnChE"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
{"type":"RECORD","stream":"test_storage_optimized_enum_off","record":{"id":"test_valid_2","chain":"ethereum","chain_edge_cases":" EtH "},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
{"type":"RECORD","stream":"test_storage_optimized_enum_off","record":{"id":"test_valid_3","chain":"optimism","chain_edge_cases":"a\\b"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
{"type":"RECORD","stream":"test_storage_optimized_enum_off","record":{"id":"test_valid_4","chain":"polygon","chain_edge_cases":"PŒLŸflôÑ"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
{"type":"RECORD","stream":"test_storage_optimized_enum_off","record":{"id":"test_valid_5","chain":"avalanche","chain_edge_cases":"_éèàÀÇÉ"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
{"type":"RECORD","stream":"test_storage_optimized_enum_off","record":{"id":"test_valid_7","chain":"optimism","chain_edge_cases":"ù€/\\\n\r\t"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
{"type":"RECORD","stream":"test_storage_optimized_enum_off","record":{"id":"test_valid_8","chain":"polygon","chain_edge_cases":"'"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"type":"SCHEMA","stream":"test_storage_optimized_enum_off","schema":{"type":"object","properties":{"id":{"type":"string"},"chain":{"type":"string","enum":["avalanche","base","ethereum","optimism","polygon"]},"chain_edge_cases":{"type":"string","enum":["AvAlAnChE","New Base Chain", " EtH ","a\u005Cb","PŒLŸflôÑ","_éèàÀÇÉ","_-.*$@#%","ù€/\\\n\r\t","'"]}},"required":["id","chain","chain_edge_cases"]},"key_properties":["id"]}
{"type":"RECORD","stream":"test_storage_optimized_enum_off","record":{"id":"test_not_in_original_schema","chain":"base","chain_edge_cases":"New Base Chain"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
{"type":"SCHEMA","stream":"test_storage_optimized_enum_off","schema":{"type":"object","properties":{"id":{"type":"string"},"chain":{"type":"string","enum":["avalanche"]},"chain_edge_cases":{"type":"string","enum":["AvAlAnChE"]}},"required":["id","chain","chain_edge_cases"]},"key_properties":["id"]}
{"type":"RECORD","stream":"test_storage_optimized_enum_off","record":{"id":"test_still_in_current_schema","chain":"avalanche","chain_edge_cases":"AvAlAnChE"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{"type":"SCHEMA","stream":"test_storage_optimized_enum_on","schema":{"type":"object","properties":{"id":{"type":"string"},"chain":{"type":"string","enum":["avalanche","ethereum","optimism","polygon"]},"chain_edge_cases":{"type":"string","enum":["AvAlAnChE"," EtH ","a\u005Cb","PŒLŸflôÑ","_éèàÀÇÉ","_-.*$@#%","ù€/\\\n\r\t","'"]}},"required":["id","chain","chain_edge_cases"]},"key_properties":["id"]}
{"type":"RECORD","stream":"test_storage_optimized_enum_on","record":{"id":"test_valid_1","chain":"avalanche","chain_edge_cases":"AvAlAnChE"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
{"type":"RECORD","stream":"test_storage_optimized_enum_on","record":{"id":"test_valid_2","chain":"ethereum","chain_edge_cases":" EtH "},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
{"type":"RECORD","stream":"test_storage_optimized_enum_on","record":{"id":"test_valid_3","chain":"optimism","chain_edge_cases":"a\\b"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
{"type":"RECORD","stream":"test_storage_optimized_enum_on","record":{"id":"test_valid_4","chain":"polygon","chain_edge_cases":"PŒLŸflôÑ"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
{"type":"RECORD","stream":"test_storage_optimized_enum_on","record":{"id":"test_valid_5","chain":"avalanche","chain_edge_cases":"_éèàÀÇÉ"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
{"type":"RECORD","stream":"test_storage_optimized_enum_on","record":{"id":"test_valid_7","chain":"optimism","chain_edge_cases":"ù€/\\\n\r\t"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
{"type":"RECORD","stream":"test_storage_optimized_enum_on","record":{"id":"test_valid_8","chain":"polygon","chain_edge_cases":"'"},"time_extracted":"2023-09-15T19:33:01.841018+00:00"}
Loading
Loading