From bf9a407fceb4729a8497d0377f49484de34f70e7 Mon Sep 17 00:00:00 2001 From: Pat Nadolny Date: Fri, 26 Jul 2024 12:40:49 -0400 Subject: [PATCH 1/8] add test to alter a table using a reserved word --- tests/core.py | 28 +++++++++++++++++++ .../existing_reserved_name_table_alter.singer | 2 ++ 2 files changed, 30 insertions(+) create mode 100644 tests/target_test_streams/existing_reserved_name_table_alter.singer diff --git a/tests/core.py b/tests/core.py index ec5db3f..f7cfb78 100644 --- a/tests/core.py +++ b/tests/core.py @@ -456,6 +456,33 @@ def setup(self) -> None: """, ) +class SnowflakeTargetExistingReservedNameTableAlter(SnowflakeTargetExistingTable): + name = "existing_reserved_name_table_alter" + # This sends a schema that will request altering from TIMESTAMP_NTZ to VARCHAR + + def setup(self) -> None: + connector = self.target.default_sink_class.connector_class(self.target.config) + table = f'{self.target.config['database']}.{self.target.config['default_target_schema']}."ORDER"'.upper() + connector.connection.execute( + f""" + CREATE OR REPLACE TABLE {table} ( + ID VARCHAR(16777216), + COL_STR VARCHAR(16777216), + COL_TS TIMESTAMP_NTZ(9), + COL_INT STRING, + COL_BOOL BOOLEAN, + COL_VARIANT VARIANT, + _SDC_BATCHED_AT TIMESTAMP_NTZ(9), + _SDC_DELETED_AT VARCHAR(16777216), + _SDC_EXTRACTED_AT TIMESTAMP_NTZ(9), + _SDC_RECEIVED_AT TIMESTAMP_NTZ(9), + _SDC_SEQUENCE NUMBER(38,0), + _SDC_TABLE_VERSION NUMBER(38,0), + PRIMARY KEY (ID) + ) + """, + ) + class SnowflakeTargetTypeEdgeCasesTest(TargetFileTestTemplate): name = "type_edge_cases" @@ -539,6 +566,7 @@ def singer_filepath(self) -> Path: SnowflakeTargetColonsInColName, SnowflakeTargetExistingTable, SnowflakeTargetExistingTableAlter, + SnowflakeTargetExistingReservedNameTableAlter, SnowflakeTargetTypeEdgeCasesTest, SnowflakeTargetColumnOrderMismatch, ], diff --git a/tests/target_test_streams/existing_reserved_name_table_alter.singer b/tests/target_test_streams/existing_reserved_name_table_alter.singer new file mode 100644 index 0000000..a79e5d2 --- /dev/null +++ b/tests/target_test_streams/existing_reserved_name_table_alter.singer @@ -0,0 +1,2 @@ +{ "type": "SCHEMA", "stream": "ORDER", "schema": { "properties": { "id": { "type": [ "string", "null" ] }, "col_str": { "type": [ "string", "null" ] }, "col_ts": { "format": "date-time", "type": [ "string", "null" ] }, "col_int": { "type": "integer" }, "col_bool": { "type": [ "boolean", "null" ] }, "col_variant": {"type": "object"} }, "type": "object" }, "key_properties": [ "id" ], "bookmark_properties": [ "col_ts" ] } +{ "type": "RECORD", "stream": "ORDER", "record": { "id": "123", "col_str": "foo", "col_ts": "2023-06-13 11:50:04.072", "col_int": 5, "col_bool": true, "col_variant": {"key": "val"} }, "time_extracted": "2023-06-14T18:08:23.074716+00:00" } From 9244352e03cf88f3c8ac61cbacc303eb211140c3 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 26 Jul 2024 16:42:07 +0000 Subject: [PATCH 2/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/core.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/core.py b/tests/core.py index f7cfb78..38ae952 100644 --- a/tests/core.py +++ b/tests/core.py @@ -456,6 +456,7 @@ def setup(self) -> None: """, ) + class SnowflakeTargetExistingReservedNameTableAlter(SnowflakeTargetExistingTable): name = "existing_reserved_name_table_alter" # This sends a schema that will request altering from TIMESTAMP_NTZ to VARCHAR From 835dc621fb0936d872c76b674bc05178629ceeb7 Mon Sep 17 00:00:00 2001 From: Pat Nadolny Date: Fri, 26 Jul 2024 13:07:17 -0400 Subject: [PATCH 3/8] test for regular create and merge with reserved word --- tests/core.py | 30 ++++++++++++++++++- ....singer => reserved_words_in_table.singer} | 0 2 files changed, 29 insertions(+), 1 deletion(-) rename tests/target_test_streams/{existing_reserved_name_table_alter.singer => reserved_words_in_table.singer} (100%) diff --git a/tests/core.py b/tests/core.py index f7cfb78..1e17f77 100644 --- a/tests/core.py +++ b/tests/core.py @@ -456,10 +456,15 @@ def setup(self) -> None: """, ) -class SnowflakeTargetExistingReservedNameTableAlter(SnowflakeTargetExistingTable): +class SnowflakeTargetExistingReservedNameTableAlter(TargetFileTestTemplate): name = "existing_reserved_name_table_alter" # This sends a schema that will request altering from TIMESTAMP_NTZ to VARCHAR + @property + def singer_filepath(self) -> Path: + current_dir = Path(__file__).resolve().parent + return current_dir / "target_test_streams" / "reserved_words_in_table.singer" + def setup(self) -> None: connector = self.target.default_sink_class.connector_class(self.target.config) table = f'{self.target.config['database']}.{self.target.config['default_target_schema']}."ORDER"'.upper() @@ -483,6 +488,28 @@ def setup(self) -> None: """, ) +class SnowflakeTargetReservedWordsInTable(TargetFileTestTemplate): + # Contains reserved words from + # https://docs.snowflake.com/en/sql-reference/reserved-keywords + # Syncs records then alters schema by adding a non-reserved word column. + name = "reserved_words_in_table" + + @property + def singer_filepath(self) -> Path: + current_dir = Path(__file__).resolve().parent + return current_dir / "target_test_streams" / "reserved_words_in_table.singer" + + def validate(self) -> None: + connector = self.target.default_sink_class.connector_class(self.target.config) + table = f'{self.target.config['database']}.{self.target.config['default_target_schema']}."ORDER"'.upper() + result = connector.connection.execute( + f"select * from {table}", + ) + assert result.rowcount == 1 + row = result.first() + assert len(row) == 12, f"Row has unexpected length {len(row)}" + + class SnowflakeTargetTypeEdgeCasesTest(TargetFileTestTemplate): name = "type_edge_cases" @@ -567,6 +594,7 @@ def singer_filepath(self) -> Path: SnowflakeTargetExistingTable, SnowflakeTargetExistingTableAlter, SnowflakeTargetExistingReservedNameTableAlter, + SnowflakeTargetReservedWordsInTable, SnowflakeTargetTypeEdgeCasesTest, SnowflakeTargetColumnOrderMismatch, ], diff --git a/tests/target_test_streams/existing_reserved_name_table_alter.singer b/tests/target_test_streams/reserved_words_in_table.singer similarity index 100% rename from tests/target_test_streams/existing_reserved_name_table_alter.singer rename to tests/target_test_streams/reserved_words_in_table.singer From f0fef97b416c9fffc83263bf90268eec61a3c0ce Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 26 Jul 2024 17:09:00 +0000 Subject: [PATCH 4/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/core.py b/tests/core.py index 1e17f77..1dbfce3 100644 --- a/tests/core.py +++ b/tests/core.py @@ -456,6 +456,7 @@ def setup(self) -> None: """, ) + class SnowflakeTargetExistingReservedNameTableAlter(TargetFileTestTemplate): name = "existing_reserved_name_table_alter" # This sends a schema that will request altering from TIMESTAMP_NTZ to VARCHAR @@ -488,6 +489,7 @@ def setup(self) -> None: """, ) + class SnowflakeTargetReservedWordsInTable(TargetFileTestTemplate): # Contains reserved words from # https://docs.snowflake.com/en/sql-reference/reserved-keywords @@ -510,7 +512,6 @@ def validate(self) -> None: assert len(row) == 12, f"Row has unexpected length {len(row)}" - class SnowflakeTargetTypeEdgeCasesTest(TargetFileTestTemplate): name = "type_edge_cases" From 28d5d18e2b7300a8f75030e359d82c9a4bb1aa9d Mon Sep 17 00:00:00 2001 From: Pat Nadolny Date: Mon, 12 Aug 2024 17:06:52 -0400 Subject: [PATCH 5/8] fix tests --- target_snowflake/connector.py | 102 ++++++++++++++++++++++++++++++---- target_snowflake/sinks.py | 4 +- tests/core.py | 2 +- 3 files changed, 93 insertions(+), 15 deletions(-) diff --git a/target_snowflake/connector.py b/target_snowflake/connector.py index 1bdd4d9..a635d85 100644 --- a/target_snowflake/connector.py +++ b/target_snowflake/connector.py @@ -1,5 +1,6 @@ from __future__ import annotations +import typing as t from operator import contains, eq from typing import TYPE_CHECKING, Any, Iterable, Sequence, cast @@ -61,6 +62,62 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: self.schema_cache: dict = {} super().__init__(*args, **kwargs) + def create_empty_table( + self, + full_table_name: str, + schema: dict, + primary_keys: t.Sequence[str] | None = None, + partition_keys: list[str] | None = None, + as_temp_table: bool = False, # noqa: FBT001, FBT002 + ) -> None: + """Create an empty target table. + + Args: + full_table_name: the target table name. + schema: the JSON schema for the new table. + primary_keys: list of key properties. + partition_keys: list of partition keys. + as_temp_table: True to create a temp table. + + Raises: + NotImplementedError: if temp tables are unsupported and as_temp_table=True. + RuntimeError: if a variant schema is passed with no properties defined. + """ + formatter = SnowflakeIdentifierPreparer(SnowflakeDialect()) + if as_temp_table: + msg = "Temporary tables are not supported." + raise NotImplementedError(msg) + + _ = partition_keys # Not supported in generic implementation. + + _, schema_name, table_name = self.parse_full_table_name(full_table_name) + columns = [] + primary_keys = primary_keys or [] + try: + properties: dict = schema["properties"] + except KeyError as e: + msg = f"Schema for '{full_table_name}' does not define properties: {schema}" + raise RuntimeError(msg) from e + formatted_pks = [] + for property_name, property_jsonschema in properties.items(): + column_type = self.to_sql_type(property_jsonschema) + column_def = f"{formatter.quote(property_name)} {column_type}" + if property_name in primary_keys: + formatted_pks.append(formatter.quote(property_name)) + columns.append(column_def) + pks = "" + if formatted_pks: + pks = f", PRIMARY KEY ({', '.join(formatted_pks)})" + column_definitions_str = ",\n ".join(columns) + create_table_sql = f""" + CREATE TABLE {formatter.quote(schema_name)}.{formatter.quote(table_name)} ( + {column_definitions_str} + {pks} + ) + """ + with self._engine.connect() as conn: + conn.execute(sqlalchemy.DDL(create_table_sql)) + def get_table_columns( self, full_table_name: str, @@ -191,7 +248,7 @@ def prepare_column( # Make quoted column names upper case because we create them that way # and the metadata that SQLAlchemy returns is case insensitive only for non-quoted # column names so these will look like they dont exist yet. - if '"' in formatter.format_collation(column_name): + if '"' in formatter.quote(column_name): column_name = column_name.upper() try: @@ -208,6 +265,18 @@ def prepare_column( ) raise + @staticmethod + def get_column_add_ddl( + table_name: str, + column_name: str, + column_type: sqlalchemy.types.TypeEngine, + ) -> sqlalchemy.DDL: + return SQLConnector.get_column_add_ddl( + SnowflakeConnector._escape_full_table_name(table_name), + column_name, + column_type, + ) + @staticmethod def get_column_rename_ddl( table_name: str, @@ -218,9 +287,9 @@ def get_column_rename_ddl( # Since we build the ddl manually we can't rely on SQLAlchemy to # quote column names automatically. return SQLConnector.get_column_rename_ddl( - table_name, - formatter.format_collation(column_name), - formatter.format_collation(new_column_name), + SnowflakeConnector._escape_full_table_name(table_name), + formatter.quote(column_name), + formatter.quote(new_column_name), ) @staticmethod @@ -244,11 +313,12 @@ def get_column_alter_ddl( formatter = SnowflakeIdentifierPreparer(SnowflakeDialect()) # Since we build the ddl manually we can't rely on SQLAlchemy to # quote column names automatically. + escaped_full_table_name = SnowflakeConnector._escape_full_table_name(table_name) return sqlalchemy.DDL( "ALTER TABLE %(table_name)s ALTER COLUMN %(column_name)s SET DATA TYPE %(column_type)s", { - "table_name": table_name, - "column_name": formatter.format_collation(column_name), + "table_name": escaped_full_table_name, + "column_name": formatter.quote(column_name), "column_type": column_type, }, ) @@ -310,7 +380,7 @@ def schema_exists(self, schema_name: str) -> bool: # Make quoted schema names upper case because we create them that way # and the metadata that SQLAlchemy returns is case insensitive only for # non-quoted schema names so these will look like they dont exist yet. - if '"' in formatter.format_collation(schema_name): + if '"' in formatter.quote(schema_name): schema_name = schema_name.upper() return schema_name in schema_names @@ -342,7 +412,7 @@ def _get_column_selections( ) -> list: column_selections = [] for property_name, property_def in schema["properties"].items(): - clean_property_name = formatter.format_collation(property_name) + clean_property_name = formatter.quote(property_name) clean_alias = clean_property_name if '"' in clean_property_name: clean_alias = clean_property_name.upper() @@ -372,8 +442,8 @@ def _get_merge_from_stage_statement( # noqa: ANN202 ) # use UPPER from here onwards - formatted_properties = [formatter.format_collation(col) for col in schema["properties"]] - formatted_key_properties = [formatter.format_collation(col) for col in key_properties] + formatted_properties = [formatter.quote(col) for col in schema["properties"]] + formatted_key_properties = [formatter.quote(col) for col in key_properties] join_expr = " and ".join( [f"d.{key} = s.{key}" for key in formatted_key_properties], ) @@ -386,9 +456,10 @@ def _get_merge_from_stage_statement( # noqa: ANN202 ) dedup_cols = ", ".join(list(formatted_key_properties)) dedup = f"QUALIFY ROW_NUMBER() OVER (PARTITION BY {dedup_cols} ORDER BY SEQ8() DESC) = 1" + escaped_full_table_name = self._escape_full_table_name(full_table_name) return ( text( - f"merge into {quoted_name(full_table_name, quote=True)} d using " # noqa: ISC003 + f"merge into {escaped_full_table_name} d using " # noqa: ISC003 + f"(select {json_casting_selects} from '@~/target-snowflake/{sync_id}'" # noqa: S608 + f"(file_format => {file_format}) {dedup}) s " + f"on {join_expr} " @@ -411,9 +482,10 @@ def _get_copy_statement(self, full_table_name, schema, sync_id, file_format): # column_selections, "col_alias", ) + escaped_full_table_name = self._escape_full_table_name(full_table_name) return ( text( - f"copy into {full_table_name} {col_alias_selects} from " # noqa: ISC003 + f"copy into {escaped_full_table_name} {col_alias_selects} from " # noqa: ISC003 + f"(select {json_casting_selects} from " # noqa: S608 + f"'@~/target-snowflake/{sync_id}')" + f"file_format = (format_name='{file_format}')", @@ -634,3 +706,9 @@ def _adapt_column_type( sql_type, ) raise + + @staticmethod + def _escape_full_table_name(full_table_name: str) -> str: + formatter = SnowflakeIdentifierPreparer(SnowflakeDialect()) + db_name, schema_name, table_name = SQLConnector().parse_full_table_name(full_table_name) + return f"{formatter.quote(db_name)}.{formatter.quote(schema_name)}.{formatter.quote(table_name)}" diff --git a/target_snowflake/sinks.py b/target_snowflake/sinks.py index fc980fb..7eb9f35 100644 --- a/target_snowflake/sinks.py +++ b/target_snowflake/sinks.py @@ -99,10 +99,10 @@ def conform_name( name: str, object_type: str | None = None, ) -> str: + formatter = SnowflakeIdentifierPreparer(SnowflakeDialect()) if object_type and object_type != "column": return super().conform_name(name=name, object_type=object_type) - formatter = SnowflakeIdentifierPreparer(SnowflakeDialect()) - if '"' not in formatter.format_collation(name.lower()): + if '"' not in formatter.quote(name.lower()): name = name.lower() return name diff --git a/tests/core.py b/tests/core.py index 1dbfce3..edb977a 100644 --- a/tests/core.py +++ b/tests/core.py @@ -509,7 +509,7 @@ def validate(self) -> None: ) assert result.rowcount == 1 row = result.first() - assert len(row) == 12, f"Row has unexpected length {len(row)}" + assert len(row) == 13, f"Row has unexpected length {len(row)}" class SnowflakeTargetTypeEdgeCasesTest(TargetFileTestTemplate): From 8b405654b3d070182a1c2f42d6b814c2cf9c4944 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 12 Aug 2024 21:07:02 +0000 Subject: [PATCH 6/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- target_snowflake/connector.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/target_snowflake/connector.py b/target_snowflake/connector.py index a635d85..6bd8b9f 100644 --- a/target_snowflake/connector.py +++ b/target_snowflake/connector.py @@ -13,7 +13,7 @@ from snowflake.sqlalchemy import URL from snowflake.sqlalchemy.base import SnowflakeIdentifierPreparer from snowflake.sqlalchemy.snowdialect import SnowflakeDialect -from sqlalchemy.sql import quoted_name, text +from sqlalchemy.sql import text from target_snowflake.snowflake_types import NUMBER, TIMESTAMP_NTZ, VARIANT From 46650af1e94e4530ac9abfd9eeaa6f2a0d8d90c6 Mon Sep 17 00:00:00 2001 From: Pat Nadolny Date: Mon, 12 Aug 2024 17:11:58 -0400 Subject: [PATCH 7/8] remove create table sql building --- target_snowflake/connector.py | 56 ----------------------------------- 1 file changed, 56 deletions(-) diff --git a/target_snowflake/connector.py b/target_snowflake/connector.py index a635d85..0e04a04 100644 --- a/target_snowflake/connector.py +++ b/target_snowflake/connector.py @@ -62,62 +62,6 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: self.schema_cache: dict = {} super().__init__(*args, **kwargs) - def create_empty_table( - self, - full_table_name: str, - schema: dict, - primary_keys: t.Sequence[str] | None = None, - partition_keys: list[str] | None = None, - as_temp_table: bool = False, # noqa: FBT001, FBT002 - ) -> None: - """Create an empty target table. - - Args: - full_table_name: the target table name. - schema: the JSON schema for the new table. - primary_keys: list of key properties. - partition_keys: list of partition keys. - as_temp_table: True to create a temp table. - - Raises: - NotImplementedError: if temp tables are unsupported and as_temp_table=True. - RuntimeError: if a variant schema is passed with no properties defined. - """ - formatter = SnowflakeIdentifierPreparer(SnowflakeDialect()) - if as_temp_table: - msg = "Temporary tables are not supported." - raise NotImplementedError(msg) - - _ = partition_keys # Not supported in generic implementation. - - _, schema_name, table_name = self.parse_full_table_name(full_table_name) - columns = [] - primary_keys = primary_keys or [] - try: - properties: dict = schema["properties"] - except KeyError as e: - msg = f"Schema for '{full_table_name}' does not define properties: {schema}" - raise RuntimeError(msg) from e - formatted_pks = [] - for property_name, property_jsonschema in properties.items(): - column_type = self.to_sql_type(property_jsonschema) - column_def = f"{formatter.quote(property_name)} {column_type}" - if property_name in primary_keys: - formatted_pks.append(formatter.quote(property_name)) - columns.append(column_def) - pks = "" - if formatted_pks: - pks = f", PRIMARY KEY ({', '.join(formatted_pks)})" - column_definitions_str = ",\n ".join(columns) - create_table_sql = f""" - CREATE TABLE {formatter.quote(schema_name)}.{formatter.quote(table_name)} ( - {column_definitions_str} - {pks} - ) - """ - with self._engine.connect() as conn: - conn.execute(sqlalchemy.DDL(create_table_sql)) - def get_table_columns( self, full_table_name: str, From 019c13e8e806b4ca3fc5b9b4bd90dae77a55fea8 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 12 Aug 2024 21:12:15 +0000 Subject: [PATCH 8/8] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- target_snowflake/connector.py | 1 - 1 file changed, 1 deletion(-) diff --git a/target_snowflake/connector.py b/target_snowflake/connector.py index 436b7dc..6f431a4 100644 --- a/target_snowflake/connector.py +++ b/target_snowflake/connector.py @@ -1,6 +1,5 @@ from __future__ import annotations -import typing as t from operator import contains, eq from typing import TYPE_CHECKING, Any, Iterable, Sequence, cast