diff --git a/dbt/adapters/sqlserver/sql_server_column.py b/dbt/adapters/sqlserver/sql_server_column.py index 93a3924f..4f2e01a4 100644 --- a/dbt/adapters/sqlserver/sql_server_column.py +++ b/dbt/adapters/sqlserver/sql_server_column.py @@ -1,4 +1,4 @@ -from typing import ClassVar, Dict +from typing import Any, ClassVar, Dict from dbt.adapters.base import Column @@ -11,3 +11,10 @@ class SQLServerColumn(Column): "INTEGER": "INT", "BOOLEAN": "BIT", } + + @classmethod + def string_type(cls, size: int) -> str: + return f"varchar({size if size > 0 else 'MAX'})" + + def literal(self, value: Any) -> str: + return "cast('{}' as {})".format(value, self.data_type) diff --git a/dbt/include/sqlserver/macros/adapters/columns.sql b/dbt/include/sqlserver/macros/adapters/columns.sql index d8c58d40..2e586f95 100644 --- a/dbt/include/sqlserver/macros/adapters/columns.sql +++ b/dbt/include/sqlserver/macros/adapters/columns.sql @@ -46,12 +46,31 @@ {%- set tmp_column = column_name + "__dbt_alter" -%} {% call statement('alter_column_type') -%} - - alter {{ relation.type }} {{ relation }} add {{ tmp_column }} {{ new_column_type }}; - update {{ relation }} set {{ tmp_column }} = {{ column_name }}; - alter {{ relation.type }} {{ relation }} drop column {{ column_name }}; + alter {{ relation.type }} {{ relation }} add "{{ tmp_column }}" {{ new_column_type }}; + {%- endcall -%} + {% call statement('alter_column_type') -%} + update {{ relation }} set "{{ tmp_column }}" = "{{ column_name }}"; + {%- endcall -%} + {% call statement('alter_column_type') -%} + alter {{ relation.type }} {{ relation }} drop column "{{ column_name }}"; + {%- endcall -%} + {% call statement('alter_column_type') -%} exec sp_rename '{{ relation | replace('"', '') }}.{{ tmp_column }}', '{{ column_name }}', 'column' - {%- endcall -%} {% endmacro %} + + +{% macro sqlserver__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %} + {% call statement('add_drop_columns') -%} + {% if add_columns %} + alter {{ relation.type }} {{ relation }} + add {% for column in add_columns %}"{{ column.name }}" {{ column.data_type }}{{ ', ' if not loop.last }}{% endfor %}; + {% endif %} + + {% if remove_columns %} + alter {{ relation.type }} {{ relation }} + drop column {% for column in remove_columns %}"{{ column.name }}"{{ ',' if not loop.last }}{% endfor %}; + {% endif %} + {%- endcall -%} +{% endmacro %} diff --git a/dbt/include/sqlserver/macros/materializations/models/incremental/merge.sql b/dbt/include/sqlserver/macros/materializations/models/incremental/merge.sql index 2f0567c7..38202a9f 100644 --- a/dbt/include/sqlserver/macros/materializations/models/incremental/merge.sql +++ b/dbt/include/sqlserver/macros/materializations/models/incremental/merge.sql @@ -5,49 +5,53 @@ https://getdbt.slack.com/archives/C50NEBJGG/p1636045535056600 #} -{% macro sqlserver__get_merge_sql(target, source, unique_key, dest_columns, predicates) %} - {{ default__get_merge_sql(target, source, unique_key, dest_columns, predicates) }}; +{% macro sqlserver__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates=none) %} + {{ default__get_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) }}; {% endmacro %} -{% macro sqlserver__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates) %} - {% if incremental_predicates %} - {{ exceptions.raise_not_implemented('incremental_predicates are not implemented in dbt-sqlserver') }} - {% endif %} +{% macro sqlserver__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) %} + {{ default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) }}; +{% endmacro %} - {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} +{% macro sqlserver__get_delete_insert_merge_sql(target, source, unique_key, dest_columns, incremental_predicates=none) %} + {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} - {% if unique_key %} - {% if unique_key is sequence and unique_key is not string %} - delete from {{ target }} - where exists ( - SELECT NULL - FROM - {{ source }} - WHERE + {% if unique_key %} + {% if unique_key is sequence and unique_key is not string %} + delete from {{ target }} + where exists ( + select null + from {{ source }} + where {% for key in unique_key %} {{ source }}.{{ key }} = {{ target }}.{{ key }} {{ "and " if not loop.last }} {% endfor %} - ); - {% else %} - delete from {{ target }} - where ( - {{ unique_key }}) in ( - select ({{ unique_key }}) - from {{ source }} - ); - {% endif %} - {% endif %} - - insert into {{ target }} ({{ dest_cols_csv }}) - ( - select {{ dest_cols_csv }} - from {{ source }} - ) - -{% endmacro %} - -{% macro sqlserver__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) %} - {{ default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) }}; + ) + {% if incremental_predicates %} + {% for predicate in incremental_predicates %} + and {{ predicate }} + {% endfor %} + {% endif %}; + {% else %} + delete from {{ target }} + where ( + {{ unique_key }}) in ( + select ({{ unique_key }}) + from {{ source }} + ) + {%- if incremental_predicates %} + {% for predicate in incremental_predicates %} + and {{ predicate }} + {% endfor %} + {%- endif -%}; + {% endif %} + {% endif %} + + insert into {{ target }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ source }} + ) {% endmacro %} diff --git a/tests/functional/adapter/test_incremental.py b/tests/functional/adapter/test_incremental.py index 873a6cfd..c4d14c2d 100644 --- a/tests/functional/adapter/test_incremental.py +++ b/tests/functional/adapter/test_incremental.py @@ -1,5 +1,120 @@ +import pytest +from dbt.tests.adapter.incremental.fixtures import ( + _MODELS__A, + _MODELS__INCREMENTAL_APPEND_NEW_COLUMNS, + _MODELS__INCREMENTAL_APPEND_NEW_COLUMNS_REMOVE_ONE, + _MODELS__INCREMENTAL_APPEND_NEW_COLUMNS_REMOVE_ONE_TARGET, + _MODELS__INCREMENTAL_APPEND_NEW_COLUMNS_TARGET, + _MODELS__INCREMENTAL_FAIL, + _MODELS__INCREMENTAL_IGNORE_TARGET, + _MODELS__INCREMENTAL_SYNC_ALL_COLUMNS, + _MODELS__INCREMENTAL_SYNC_REMOVE_ONLY, +) +from dbt.tests.adapter.incremental.test_incremental_on_schema_change import ( + BaseIncrementalOnSchemaChange, +) +from dbt.tests.adapter.incremental.test_incremental_predicates import BaseIncrementalPredicates from dbt.tests.adapter.incremental.test_incremental_unique_id import BaseIncrementalUniqueKey +_MODELS__INCREMENTAL_IGNORE = """ +{{ + config( + materialized='incremental', + unique_key='id', + on_schema_change='ignore' + ) +}} + +WITH source_data AS (SELECT * FROM {{ ref('model_a') }} ) + +{% if is_incremental() %} + +SELECT + id, + field1, + field2, + field3, + field4 +FROM source_data +WHERE id NOT IN (SELECT id from {{ this }} ) + +{% else %} + +SELECT TOP 3 id, field1, field2 FROM source_data + +{% endif %} +""" + +_MODELS__INCREMENTAL_SYNC_REMOVE_ONLY_TARGET = """ +{{ + config(materialized='table') +}} + +with source_data as ( + + select * from {{ ref('model_a') }} + +) + +{% set string_type = dbt.type_string() %} + +select id + ,cast(field1 as {{string_type}}) as field1 + +from source_data +""" + +_MODELS__INCREMENTAL_SYNC_ALL_COLUMNS_TARGET = """ +{{ + config(materialized='table') +}} + +with source_data as ( + + select * from {{ ref('model_a') }} + +) + +{% set string_type = dbt.type_string() %} + +select id + ,cast(field1 as {{string_type}}) as field1 + --,field2 + ,cast(case when id <= 3 then null else field3 end as {{string_type}}) as field3 + ,cast(case when id <= 3 then null else field4 end as {{string_type}}) as field4 + +from source_data +""" + class TestBaseIncrementalUniqueKeySQLServer(BaseIncrementalUniqueKey): pass + + +class TestIncrementalOnSchemaChangeSQLServer(BaseIncrementalOnSchemaChange): + @pytest.fixture(scope="class") + def models(self): + return { + "incremental_sync_remove_only.sql": _MODELS__INCREMENTAL_SYNC_REMOVE_ONLY, + "incremental_ignore.sql": _MODELS__INCREMENTAL_IGNORE, + "incremental_sync_remove_only_target.sql": _MODELS__INCREMENTAL_SYNC_REMOVE_ONLY_TARGET, # noqa: E501 + "incremental_ignore_target.sql": _MODELS__INCREMENTAL_IGNORE_TARGET, + "incremental_fail.sql": _MODELS__INCREMENTAL_FAIL, + "incremental_sync_all_columns.sql": _MODELS__INCREMENTAL_SYNC_ALL_COLUMNS, + "incremental_append_new_columns_remove_one.sql": _MODELS__INCREMENTAL_APPEND_NEW_COLUMNS_REMOVE_ONE, # noqa: E501 + "model_a.sql": _MODELS__A, + "incremental_append_new_columns_target.sql": _MODELS__INCREMENTAL_APPEND_NEW_COLUMNS_TARGET, # noqa: E501 + "incremental_append_new_columns.sql": _MODELS__INCREMENTAL_APPEND_NEW_COLUMNS, + "incremental_sync_all_columns_target.sql": _MODELS__INCREMENTAL_SYNC_ALL_COLUMNS_TARGET, # noqa: E501 + "incremental_append_new_columns_remove_one_target.sql": _MODELS__INCREMENTAL_APPEND_NEW_COLUMNS_REMOVE_ONE_TARGET, # noqa: E501 + } + + +class TestIncrementalPredicatesDeleteInsertSQLServer(BaseIncrementalPredicates): + pass + + +class TestPredicatesDeleteInsertSQLServer(BaseIncrementalPredicates): + @pytest.fixture(scope="class") + def project_config_update(self): + return {"models": {"+predicates": ["id != 2"], "+incremental_strategy": "delete+insert"}} diff --git a/tests/functional/adapter/test_seed.py b/tests/functional/adapter/test_seed.py index 0b08e91a..8157566c 100644 --- a/tests/functional/adapter/test_seed.py +++ b/tests/functional/adapter/test_seed.py @@ -49,7 +49,7 @@ {% endfor %} {% set col_type = col_types.get(column_name) %} - {% set col_type = 'text' if col_type and 'character varying' in col_type else col_type %} + {% set col_type = 'text' if col_type and 'varchar' in col_type else col_type %} {% set validation_message = 'Got a column type of ' ~ col_type ~ ', expected ' ~ type %}