diff --git a/dlt/destinations/impl/snowflake/configuration.py b/dlt/destinations/impl/snowflake/configuration.py index f1c976d02d..ca93d8afdb 100644 --- a/dlt/destinations/impl/snowflake/configuration.py +++ b/dlt/destinations/impl/snowflake/configuration.py @@ -141,6 +141,9 @@ class SnowflakeClientConfiguration(DestinationClientDwhWithStagingConfiguration) create_indexes: bool = False """Whether UNIQUE or PRIMARY KEY constrains should be created""" + use_vectorized_scanner: bool = False + """Whether to use or not use the vectorized scanner in COPY INTO""" + def fingerprint(self) -> str: """Returns a fingerprint of host part of a connection string""" if self.credentials and self.credentials.host: diff --git a/dlt/destinations/impl/snowflake/snowflake.py b/dlt/destinations/impl/snowflake/snowflake.py index a63f41a28b..b53e4ddc40 100644 --- a/dlt/destinations/impl/snowflake/snowflake.py +++ b/dlt/destinations/impl/snowflake/snowflake.py @@ -85,6 +85,7 @@ def run(self) -> None: stage_file_path, self._staging_credentials, self._config.csv_format, + self._config.use_vectorized_scanner, ) with self._sql_client.begin_transaction(): @@ -109,6 +110,7 @@ def gen_copy_sql( local_stage_file_path: Optional[str] = None, staging_credentials: Optional[CredentialsConfiguration] = None, csv_format: Optional[CsvFormatConfiguration] = None, + use_vectorized_scanner: Optional[bool] = False, ) -> str: parsed_file_url = urlparse(file_url) # check if local filesystem (file scheme or just a local file in native form) @@ -171,11 +173,11 @@ def gen_copy_sql( if loader_file_format == "jsonl": source_format = "( TYPE = 'JSON', BINARY_FORMAT = 'BASE64' )" elif loader_file_format == "parquet": - source_format = ( - "(TYPE = 'PARQUET', BINARY_AS_TEXT = FALSE, USE_LOGICAL_TYPE = TRUE)" - # TODO: USE_VECTORIZED_SCANNER inserts null strings into VARIANT JSON - # " USE_VECTORIZED_SCANNER = TRUE)" - ) + source_format = "(TYPE = 'PARQUET', BINARY_AS_TEXT = FALSE, USE_LOGICAL_TYPE = TRUE" + if use_vectorized_scanner: + source_format += ", USE_VECTORIZED_SCANNER = TRUE" + on_error_clause = "ON_ERROR = ABORT_STATEMENT" + source_format += ")" elif loader_file_format == "csv": # empty strings are NULL, no data is NULL, missing columns (ERROR_ON_COLUMN_COUNT_MISMATCH) are NULL csv_format = csv_format or CsvFormatConfiguration() diff --git a/docs/website/docs/dlt-ecosystem/destinations/snowflake.md b/docs/website/docs/dlt-ecosystem/destinations/snowflake.md index 28684c39ac..e5f02b0805 100644 --- a/docs/website/docs/dlt-ecosystem/destinations/snowflake.md +++ b/docs/website/docs/dlt-ecosystem/destinations/snowflake.md @@ -183,6 +183,17 @@ When staging is enabled: When loading from Parquet, Snowflake will store `json` types (JSON) in `VARIANT` as a string. Use the JSONL format instead or use `PARSE_JSON` to update the `VARIANT` field after loading. ::: +When using the Parquet format, you can enable the **vectorized scanner** to improve performance. By default, this feature uses the `ON_ERROR=ABORT_STATEMENT` setting in `dlt`, which stops execution if an error occurs. +To enable the vectorized scanner, add the following to your configuration: + +```toml +[destination.snowflake] +use_vectorized_scanner=true +``` +:::note +The **vectorized scanner** explicitly displays `NULL` values in the output and has specific characteristics. Please refer to the official Snowflake documentation. +::: + ### Custom CSV formats By default, we support the CSV format [produced by our writers](../file-formats/csv.md#default-settings), which is comma-delimited, with a header, and optionally quoted. @@ -217,9 +228,9 @@ Names of tables and columns in [schemas](../../general-usage/schema.md) are kept ## Staging support -Snowflake supports S3 and GCS as file staging destinations. `dlt` will upload files in the parquet format to the bucket provider and will ask Snowflake to copy their data directly into the db. +Snowflake supports S3 and GCS as file staging destinations. `dlt` will upload files in the Parquet format to the bucket provider and will ask Snowflake to copy their data directly into the db. -Alternatively to parquet files, you can also specify jsonl as the staging file format. For this, set the `loader_file_format` argument of the `run` command of the pipeline to `jsonl`. +Alternatively to Parquet files, you can also specify jsonl as the staging file format. For this, set the `loader_file_format` argument of the `run` command of the pipeline to `jsonl`. ### Snowflake and Amazon S3 @@ -324,6 +335,8 @@ stage_name="DLT_STAGE" keep_staged_files=true # Add UNIQUE and PRIMARY KEY hints to tables create_indexes=true +# Enable vectorized scanner when using the Parquet format +use_vectorized_scanner=true ``` ### Setting up CSV format diff --git a/tests/load/pipeline/test_snowflake_pipeline.py b/tests/load/pipeline/test_snowflake_pipeline.py index 16e95bba7a..89e47ff78f 100644 --- a/tests/load/pipeline/test_snowflake_pipeline.py +++ b/tests/load/pipeline/test_snowflake_pipeline.py @@ -1,3 +1,4 @@ +from copy import deepcopy import os import pytest from pytest_mock import MockerFixture @@ -6,7 +7,6 @@ from dlt.common import pendulum from dlt.common.utils import uniq_id from dlt.destinations.exceptions import DatabaseUndefinedRelation - from dlt.load.exceptions import LoadClientJobFailed from dlt.pipeline.exceptions import PipelineStepFailed @@ -14,10 +14,14 @@ from tests.load.snowflake.test_snowflake_client import QUERY_TAG from tests.pipeline.utils import assert_load_info, assert_query_data from tests.load.utils import ( + TABLE_UPDATE_COLUMNS_SCHEMA, + assert_all_data_types_row, destinations_configs, DestinationTestConfiguration, drop_active_pipeline_data, ) +from tests.cases import TABLE_ROW_ALL_DATA_TYPES_DATETIMES + # mark all tests as essential, do not remove pytestmark = pytest.mark.essential @@ -204,3 +208,112 @@ def test_char_replacement_cs_naming_convention( results = rel_.fetchall() assert len(results) == 1 assert "AmlSistUtfoertDato" in rel_.columns_schema + + +@pytest.mark.parametrize( + "destination_config", + destinations_configs( + default_sql_configs=True, + all_staging_configs=True, + with_file_format="parquet", + subset=["snowflake"], + ), + ids=lambda x: x.name, +) +@pytest.mark.parametrize( + "use_vectorized_scanner", + ["TRUE", "FALSE"], +) +def test_snowflake_use_vectorized_scanner( + destination_config, use_vectorized_scanner: str, mocker: MockerFixture +) -> None: + """Tests whether the vectorized scanner option is correctly applied when loading Parquet files into Snowflake.""" + + from dlt.destinations.impl.snowflake.snowflake import SnowflakeLoadJob + + os.environ["DESTINATION__SNOWFLAKE__USE_VECTORIZED_SCANNER"] = use_vectorized_scanner + + load_job_spy = mocker.spy(SnowflakeLoadJob, "gen_copy_sql") + + data_types = deepcopy(TABLE_ROW_ALL_DATA_TYPES_DATETIMES) + column_schemas = deepcopy(TABLE_UPDATE_COLUMNS_SCHEMA) + expected_rows = deepcopy(TABLE_ROW_ALL_DATA_TYPES_DATETIMES) + + @dlt.resource(table_name="data_types", write_disposition="merge", columns=column_schemas) + def my_resource(): + nonlocal data_types + yield [data_types] * 10 + + pipeline = destination_config.setup_pipeline( + f"vectorized_scanner_{use_vectorized_scanner}_{uniq_id()}", + dataset_name="parquet_test_" + uniq_id(), + ) + + info = pipeline.run(my_resource(), **destination_config.run_kwargs) + package_info = pipeline.get_load_package_info(info.loads_ids[0]) + assert package_info.state == "loaded" + assert len(package_info.jobs["failed_jobs"]) == 0 + # 1 table + 1 state + 2 reference jobs if staging + expected_completed_jobs = 2 + 2 if pipeline.staging else 2 + # add sql merge job + if destination_config.supports_merge: + expected_completed_jobs += 1 + assert len(package_info.jobs["completed_jobs"]) == expected_completed_jobs + + if use_vectorized_scanner == "FALSE": + # no vectorized scanner in all copy jobs + assert sum( + [ + 1 + for spy_return in load_job_spy.spy_return_list + if "USE_VECTORIZED_SCANNER = TRUE" not in spy_return + ] + ) == len(load_job_spy.spy_return_list) + assert sum( + [ + 1 + for spy_return in load_job_spy.spy_return_list + if "ON_ERROR = ABORT_STATEMENT" not in spy_return + ] + ) == len(load_job_spy.spy_return_list) + + elif use_vectorized_scanner == "TRUE": + # vectorized scanner in one copy job to data_types + assert ( + sum( + [ + 1 + for spy_return in load_job_spy.spy_return_list + if "USE_VECTORIZED_SCANNER = TRUE" in spy_return + ] + ) + == 1 + ) + assert ( + sum( + [ + 1 + for spy_return in load_job_spy.spy_return_list + if "ON_ERROR = ABORT_STATEMENT" in spy_return + ] + ) + == 1 + ) + + # the vectorized scanner shows NULL values in json outputs when enabled + # as a result, when queried back, we receive a string "null" in json type + expected_rows["col9_null"] = "null" + + with pipeline.sql_client() as sql_client: + qual_name = sql_client.make_qualified_table_name + db_rows = sql_client.execute_sql(f"SELECT * FROM {qual_name('data_types')}") + assert len(db_rows) == 10 + db_row = list(db_rows[0]) + # "snowflake" does not parse JSON from parquet string so double parse + assert_all_data_types_row( + db_row, + expected_row=expected_rows, + schema=column_schemas, + parse_json_strings=True, + timestamp_precision=6, + )