Skip to content

Commit

Permalink
Merge pull request #376 from dbt-msft/incremental-tests
Browse files Browse the repository at this point in the history
[incremental models] add tests, various bugfixes and support for incremental predicates
  • Loading branch information
sdebruyn authored May 15, 2023
2 parents c6a9748 + c28fe8d commit 6510b76
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 43 deletions.
9 changes: 8 additions & 1 deletion dbt/adapters/sqlserver/sql_server_column.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import ClassVar, Dict
from typing import Any, ClassVar, Dict

from dbt.adapters.base import Column

Expand All @@ -11,3 +11,10 @@ class SQLServerColumn(Column):
"INTEGER": "INT",
"BOOLEAN": "BIT",
}

@classmethod
def string_type(cls, size: int) -> str:
return f"varchar({size if size > 0 else 'MAX'})"

def literal(self, value: Any) -> str:
return "cast('{}' as {})".format(value, self.data_type)
29 changes: 24 additions & 5 deletions dbt/include/sqlserver/macros/adapters/columns.sql
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,31 @@
{%- set tmp_column = column_name + "__dbt_alter" -%}

{% call statement('alter_column_type') -%}

alter {{ relation.type }} {{ relation }} add {{ tmp_column }} {{ new_column_type }};
update {{ relation }} set {{ tmp_column }} = {{ column_name }};
alter {{ relation.type }} {{ relation }} drop column {{ column_name }};
alter {{ relation.type }} {{ relation }} add "{{ tmp_column }}" {{ new_column_type }};
{%- endcall -%}
{% call statement('alter_column_type') -%}
update {{ relation }} set "{{ tmp_column }}" = "{{ column_name }}";
{%- endcall -%}
{% call statement('alter_column_type') -%}
alter {{ relation.type }} {{ relation }} drop column "{{ column_name }}";
{%- endcall -%}
{% call statement('alter_column_type') -%}
exec sp_rename '{{ relation | replace('"', '') }}.{{ tmp_column }}', '{{ column_name }}', 'column'
{%- endcall -%}
{% endmacro %}
{% macro sqlserver__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %}
{% call statement('add_drop_columns') -%}
{% if add_columns %}
alter {{ relation.type }} {{ relation }}
add {% for column in add_columns %}"{{ column.name }}" {{ column.data_type }}{{ ', ' if not loop.last }}{% endfor %};
{% endif %}
{% if remove_columns %}
alter {{ relation.type }} {{ relation }}
drop column {% for column in remove_columns %}"{{ column.name }}"{{ ',' if not loop.last }}{% endfor %};
{% endif %}
{%- endcall -%}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,53 @@
https://getdbt.slack.com/archives/C50NEBJGG/p1636045535056600
#}

{% macro sqlserver__get_merge_sql(target, source, unique_key, dest_columns, predicates) %}
{{ default__get_merge_sql(target, source, unique_key, dest_columns, predicates) }};
{% macro sqlserver__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates=none) %}
{{ default__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) }};
{% endmacro %}

{% macro sqlserver__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) %}
{% if incremental_predicates %}
{{ exceptions.raise_not_implemented('incremental_predicates are not implemented in dbt-sqlserver') }}
{% endif %}
{% macro sqlserver__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) %}
{{ default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) }};
{% endmacro %}

{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{% macro sqlserver__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates=none) %}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

{% if unique_key %}
{% if unique_key is sequence and unique_key is not string %}
delete from {{ target }}
where exists (
SELECT NULL
FROM
{{ source }}
WHERE
{% if unique_key %}
{% if unique_key is sequence and unique_key is not string %}
delete from {{ target }}
where exists (
select null
from {{ source }}
where
{% for key in unique_key %}
{{ source }}.{{ key }} = {{ target }}.{{ key }}
{{ "and " if not loop.last }}
{% endfor %}
);
{% else %}
delete from {{ target }}
where (
{{ unique_key }}) in (
select ({{ unique_key }})
from {{ source }}
);

{% endif %}
{% endif %}

insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source }}
)

{% endmacro %}

{% macro sqlserver__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) %}
{{ default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) }};
)
{% if incremental_predicates %}
{% for predicate in incremental_predicates %}
and {{ predicate }}
{% endfor %}
{% endif %};
{% else %}
delete from {{ target }}
where (
{{ unique_key }}) in (
select ({{ unique_key }})
from {{ source }}
)
{%- if incremental_predicates %}
{% for predicate in incremental_predicates %}
and {{ predicate }}
{% endfor %}
{%- endif -%};
{% endif %}
{% endif %}

insert into {{ target }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ source }}
)
{% endmacro %}
115 changes: 115 additions & 0 deletions tests/functional/adapter/test_incremental.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,120 @@
import pytest
from dbt.tests.adapter.incremental.fixtures import (
_MODELS__A,
_MODELS__INCREMENTAL_APPEND_NEW_COLUMNS,
_MODELS__INCREMENTAL_APPEND_NEW_COLUMNS_REMOVE_ONE,
_MODELS__INCREMENTAL_APPEND_NEW_COLUMNS_REMOVE_ONE_TARGET,
_MODELS__INCREMENTAL_APPEND_NEW_COLUMNS_TARGET,
_MODELS__INCREMENTAL_FAIL,
_MODELS__INCREMENTAL_IGNORE_TARGET,
_MODELS__INCREMENTAL_SYNC_ALL_COLUMNS,
_MODELS__INCREMENTAL_SYNC_REMOVE_ONLY,
)
from dbt.tests.adapter.incremental.test_incremental_on_schema_change import (
BaseIncrementalOnSchemaChange,
)
from dbt.tests.adapter.incremental.test_incremental_predicates import BaseIncrementalPredicates
from dbt.tests.adapter.incremental.test_incremental_unique_id import BaseIncrementalUniqueKey

_MODELS__INCREMENTAL_IGNORE = """
{{
config(
materialized='incremental',
unique_key='id',
on_schema_change='ignore'
)
}}
WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )
{% if is_incremental() %}
SELECT
id,
field1,
field2,
field3,
field4
FROM source_data
WHERE id NOT IN (SELECT id from {{ this }} )
{% else %}
SELECT TOP 3 id, field1, field2 FROM source_data
{% endif %}
"""

_MODELS__INCREMENTAL_SYNC_REMOVE_ONLY_TARGET = """
{{
config(materialized='table')
}}
with source_data as (
select * from {{ ref('model_a') }}
)
{% set string_type = dbt.type_string() %}
select id
,cast(field1 as {{string_type}}) as field1
from source_data
"""

_MODELS__INCREMENTAL_SYNC_ALL_COLUMNS_TARGET = """
{{
config(materialized='table')
}}
with source_data as (
select * from {{ ref('model_a') }}
)
{% set string_type = dbt.type_string() %}
select id
,cast(field1 as {{string_type}}) as field1
--,field2
,cast(case when id <= 3 then null else field3 end as {{string_type}}) as field3
,cast(case when id <= 3 then null else field4 end as {{string_type}}) as field4
from source_data
"""


class TestBaseIncrementalUniqueKeySQLServer(BaseIncrementalUniqueKey):
pass


class TestIncrementalOnSchemaChangeSQLServer(BaseIncrementalOnSchemaChange):
@pytest.fixture(scope="class")
def models(self):
return {
"incremental_sync_remove_only.sql": _MODELS__INCREMENTAL_SYNC_REMOVE_ONLY,
"incremental_ignore.sql": _MODELS__INCREMENTAL_IGNORE,
"incremental_sync_remove_only_target.sql": _MODELS__INCREMENTAL_SYNC_REMOVE_ONLY_TARGET, # noqa: E501
"incremental_ignore_target.sql": _MODELS__INCREMENTAL_IGNORE_TARGET,
"incremental_fail.sql": _MODELS__INCREMENTAL_FAIL,
"incremental_sync_all_columns.sql": _MODELS__INCREMENTAL_SYNC_ALL_COLUMNS,
"incremental_append_new_columns_remove_one.sql": _MODELS__INCREMENTAL_APPEND_NEW_COLUMNS_REMOVE_ONE, # noqa: E501
"model_a.sql": _MODELS__A,
"incremental_append_new_columns_target.sql": _MODELS__INCREMENTAL_APPEND_NEW_COLUMNS_TARGET, # noqa: E501
"incremental_append_new_columns.sql": _MODELS__INCREMENTAL_APPEND_NEW_COLUMNS,
"incremental_sync_all_columns_target.sql": _MODELS__INCREMENTAL_SYNC_ALL_COLUMNS_TARGET, # noqa: E501
"incremental_append_new_columns_remove_one_target.sql": _MODELS__INCREMENTAL_APPEND_NEW_COLUMNS_REMOVE_ONE_TARGET, # noqa: E501
}


class TestIncrementalPredicatesDeleteInsertSQLServer(BaseIncrementalPredicates):
pass


class TestPredicatesDeleteInsertSQLServer(BaseIncrementalPredicates):
@pytest.fixture(scope="class")
def project_config_update(self):
return {"models": {"+predicates": ["id != 2"], "+incremental_strategy": "delete+insert"}}
2 changes: 1 addition & 1 deletion tests/functional/adapter/test_seed.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
{% endfor %}
{% set col_type = col_types.get(column_name) %}
{% set col_type = 'text' if col_type and 'character varying' in col_type else col_type %}
{% set col_type = 'text' if col_type and 'varchar' in col_type else col_type %}
{% set validation_message = 'Got a column type of ' ~ col_type ~ ', expected ' ~ type %}
Expand Down

0 comments on commit 6510b76

Please sign in to comment.