Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/2251-use vectorized scanner #2329

Merged
merged 7 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions dlt/destinations/impl/snowflake/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ class SnowflakeClientConfiguration(DestinationClientDwhWithStagingConfiguration)
create_indexes: bool = False
"""Whether UNIQUE or PRIMARY KEY constrains should be created"""

use_vectorized_scanner: bool = False
"""Whether to use or not use the vectorized scanner in COPY INTO"""

def fingerprint(self) -> str:
"""Returns a fingerprint of host part of a connection string"""
if self.credentials and self.credentials.host:
Expand Down
12 changes: 7 additions & 5 deletions dlt/destinations/impl/snowflake/snowflake.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def run(self) -> None:
stage_file_path,
self._staging_credentials,
self._config.csv_format,
self._config.use_vectorized_scanner,
)

with self._sql_client.begin_transaction():
Expand All @@ -109,6 +110,7 @@ def gen_copy_sql(
local_stage_file_path: Optional[str] = None,
staging_credentials: Optional[CredentialsConfiguration] = None,
csv_format: Optional[CsvFormatConfiguration] = None,
use_vectorized_scanner: Optional[bool] = False,
) -> str:
parsed_file_url = urlparse(file_url)
# check if local filesystem (file scheme or just a local file in native form)
Expand Down Expand Up @@ -171,11 +173,11 @@ def gen_copy_sql(
if loader_file_format == "jsonl":
source_format = "( TYPE = 'JSON', BINARY_FORMAT = 'BASE64' )"
elif loader_file_format == "parquet":
source_format = (
"(TYPE = 'PARQUET', BINARY_AS_TEXT = FALSE, USE_LOGICAL_TYPE = TRUE)"
# TODO: USE_VECTORIZED_SCANNER inserts null strings into VARIANT JSON
# " USE_VECTORIZED_SCANNER = TRUE)"
)
source_format = "(TYPE = 'PARQUET', BINARY_AS_TEXT = FALSE, USE_LOGICAL_TYPE = TRUE"
if use_vectorized_scanner:
source_format += ", USE_VECTORIZED_SCANNER = TRUE"
on_error_clause = "ON_ERROR = ABORT_STATEMENT"
source_format += ")"
elif loader_file_format == "csv":
# empty strings are NULL, no data is NULL, missing columns (ERROR_ON_COLUMN_COUNT_MISMATCH) are NULL
csv_format = csv_format or CsvFormatConfiguration()
Expand Down
17 changes: 15 additions & 2 deletions docs/website/docs/dlt-ecosystem/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,17 @@ When staging is enabled:
When loading from Parquet, Snowflake will store `json` types (JSON) in `VARIANT` as a string. Use the JSONL format instead or use `PARSE_JSON` to update the `VARIANT` field after loading.
:::

When using the Parquet format, you can enable the **vectorized scanner** to improve performance. By default, this feature uses the `ON_ERROR=ABORT_STATEMENT` setting in `dlt`, which stops execution if an error occurs.
To enable the vectorized scanner, add the following to your configuration:

```toml
[destination.snowflake]
use_vectorized_scanner=true
```
:::note
The **vectorized scanner** explicitly displays `NULL` values in the output and has specific characteristics. Please refer to the official Snowflake documentation.
:::

### Custom CSV formats
By default, we support the CSV format [produced by our writers](../file-formats/csv.md#default-settings), which is comma-delimited, with a header, and optionally quoted.

Expand Down Expand Up @@ -217,9 +228,9 @@ Names of tables and columns in [schemas](../../general-usage/schema.md) are kept

## Staging support

Snowflake supports S3 and GCS as file staging destinations. `dlt` will upload files in the parquet format to the bucket provider and will ask Snowflake to copy their data directly into the db.
Snowflake supports S3 and GCS as file staging destinations. `dlt` will upload files in the Parquet format to the bucket provider and will ask Snowflake to copy their data directly into the db.

Alternatively to parquet files, you can also specify jsonl as the staging file format. For this, set the `loader_file_format` argument of the `run` command of the pipeline to `jsonl`.
Alternatively to Parquet files, you can also specify jsonl as the staging file format. For this, set the `loader_file_format` argument of the `run` command of the pipeline to `jsonl`.

### Snowflake and Amazon S3

Expand Down Expand Up @@ -324,6 +335,8 @@ stage_name="DLT_STAGE"
keep_staged_files=true
# Add UNIQUE and PRIMARY KEY hints to tables
create_indexes=true
# Enable vectorized scanner when using the Parquet format
use_vectorized_scanner=true
```

### Setting up CSV format
Expand Down
115 changes: 114 additions & 1 deletion tests/load/pipeline/test_snowflake_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from copy import deepcopy
import os
import pytest
from pytest_mock import MockerFixture
Expand All @@ -6,18 +7,21 @@
from dlt.common import pendulum
from dlt.common.utils import uniq_id
from dlt.destinations.exceptions import DatabaseUndefinedRelation

from dlt.load.exceptions import LoadClientJobFailed
from dlt.pipeline.exceptions import PipelineStepFailed

from tests.load.pipeline.test_pipelines import simple_nested_pipeline
from tests.load.snowflake.test_snowflake_client import QUERY_TAG
from tests.pipeline.utils import assert_load_info, assert_query_data
from tests.load.utils import (
TABLE_UPDATE_COLUMNS_SCHEMA,
assert_all_data_types_row,
destinations_configs,
DestinationTestConfiguration,
drop_active_pipeline_data,
)
from tests.cases import TABLE_ROW_ALL_DATA_TYPES_DATETIMES


# mark all tests as essential, do not remove
pytestmark = pytest.mark.essential
Expand Down Expand Up @@ -204,3 +208,112 @@ def test_char_replacement_cs_naming_convention(
results = rel_.fetchall()
assert len(results) == 1
assert "AmlSistUtfoertDato" in rel_.columns_schema


@pytest.mark.parametrize(
"destination_config",
destinations_configs(
default_sql_configs=True,
all_staging_configs=True,
with_file_format="parquet",
subset=["snowflake"],
),
ids=lambda x: x.name,
)
@pytest.mark.parametrize(
"use_vectorized_scanner",
["TRUE", "FALSE"],
)
def test_snowflake_use_vectorized_scanner(
destination_config, use_vectorized_scanner: str, mocker: MockerFixture
) -> None:
"""Tests whether the vectorized scanner option is correctly applied when loading Parquet files into Snowflake."""

from dlt.destinations.impl.snowflake.snowflake import SnowflakeLoadJob

os.environ["DESTINATION__SNOWFLAKE__USE_VECTORIZED_SCANNER"] = use_vectorized_scanner

load_job_spy = mocker.spy(SnowflakeLoadJob, "gen_copy_sql")

data_types = deepcopy(TABLE_ROW_ALL_DATA_TYPES_DATETIMES)
column_schemas = deepcopy(TABLE_UPDATE_COLUMNS_SCHEMA)
expected_rows = deepcopy(TABLE_ROW_ALL_DATA_TYPES_DATETIMES)

@dlt.resource(table_name="data_types", write_disposition="merge", columns=column_schemas)
def my_resource():
nonlocal data_types
yield [data_types] * 10

pipeline = destination_config.setup_pipeline(
f"vectorized_scanner_{use_vectorized_scanner}_{uniq_id()}",
dataset_name="parquet_test_" + uniq_id(),
)

info = pipeline.run(my_resource(), **destination_config.run_kwargs)
package_info = pipeline.get_load_package_info(info.loads_ids[0])
assert package_info.state == "loaded"
assert len(package_info.jobs["failed_jobs"]) == 0
# 1 table + 1 state + 2 reference jobs if staging
expected_completed_jobs = 2 + 2 if pipeline.staging else 2
# add sql merge job
if destination_config.supports_merge:
expected_completed_jobs += 1
assert len(package_info.jobs["completed_jobs"]) == expected_completed_jobs

if use_vectorized_scanner == "FALSE":
# no vectorized scanner in all copy jobs
assert sum(
[
1
for spy_return in load_job_spy.spy_return_list
if "USE_VECTORIZED_SCANNER = TRUE" not in spy_return
]
) == len(load_job_spy.spy_return_list)
assert sum(
[
1
for spy_return in load_job_spy.spy_return_list
if "ON_ERROR = ABORT_STATEMENT" not in spy_return
]
) == len(load_job_spy.spy_return_list)

elif use_vectorized_scanner == "TRUE":
# vectorized scanner in one copy job to data_types
assert (
sum(
[
1
for spy_return in load_job_spy.spy_return_list
if "USE_VECTORIZED_SCANNER = TRUE" in spy_return
]
)
== 1
)
assert (
sum(
[
1
for spy_return in load_job_spy.spy_return_list
if "ON_ERROR = ABORT_STATEMENT" in spy_return
]
)
== 1
)

# the vectorized scanner shows NULL values in json outputs when enabled
# as a result, when queried back, we receive a string "null" in json type
expected_rows["col9_null"] = "null"

with pipeline.sql_client() as sql_client:
qual_name = sql_client.make_qualified_table_name
db_rows = sql_client.execute_sql(f"SELECT * FROM {qual_name('data_types')}")
assert len(db_rows) == 10
db_row = list(db_rows[0])
# "snowflake" does not parse JSON from parquet string so double parse
assert_all_data_types_row(
db_row,
expected_row=expected_rows,
schema=column_schemas,
parse_json_strings=True,
timestamp_precision=6,
)
Loading