diff --git a/CHANGELOG.md b/CHANGELOG.md index c116ce409..b6485742b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ ## dbt-databricks 1.3.0 (Release TBD) +### Under the hood +- Apply "Initial refactoring of incremental materialization" ([#148](https://github.com/databricks/dbt-databricks/pull/148)) + - Now dbt-databricks uses `adapter.get_incremental_strategy_macro` instead of `dbt_spark_get_incremental_sql` macro to dispatch the incremental strategy macro. The overwritten `dbt_spark_get_incremental_sql` macro will not work anymore. + ## dbt-databricks 1.2.1 (August 24, 2022) ### Features diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index 0645632f2..490de8838 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -211,6 +211,9 @@ def _get_columns_for_catalog(self, relation: DatabricksRelation) -> Iterable[Dic as_dict["column_type"] = as_dict.pop("dtype") yield as_dict + def valid_incremental_strategies(self) -> List[str]: + return ["append", "merge", "insert_overwrite"] + @contextmanager def _catalog(self, catalog: Optional[str]) -> Iterator[None]: """ diff --git a/dbt/include/databricks/macros/materializations/incremental/incremental.sql b/dbt/include/databricks/macros/materializations/incremental/incremental.sql index 2cf6c9805..200ba496f 100644 --- a/dbt/include/databricks/macros/materializations/incremental/incremental.sql +++ b/dbt/include/databricks/macros/materializations/incremental/incremental.sql @@ -6,7 +6,7 @@ {%- set grant_config = config.get('grants') -%} {%- set file_format = dbt_spark_validate_get_file_format(raw_file_format) -%} - {%- set strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%} + {%- set incremental_strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%} {%- set unique_key = config.get('unique_key', none) -%} {%- set partition_by = config.get('partition_by', none) -%} @@ -17,9 +17,8 @@ {% set target_relation = this %} {% set existing_relation = load_relation(this) %} - {% set tmp_relation = make_temp_relation(this) %} - {% if strategy == 'insert_overwrite' and partition_by %} + {% if incremental_strategy == 'insert_overwrite' and partition_by %} {% call statement() %} set spark.sql.sources.partitionOverwriteMode = DYNAMIC {% endcall %} @@ -37,9 +36,12 @@ {% endif %} {% set build_sql = create_table_as(False, target_relation, sql) %} {% else %} - {% do run_query(create_table_as(True, tmp_relation, sql)) %} - {% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %} - {% set build_sql = dbt_spark_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key) %} + {% set temp_relation = make_temp_relation(this) %} + {% do run_query(create_table_as(True, temp_relation, sql)) %} + {% do process_schema_changes(on_schema_change, temp_relation, existing_relation) %} + {% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %} + {% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key }) %} + {% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %} {% endif %} {%- call statement('main') -%} diff --git a/dbt/include/databricks/macros/materializations/incremental/strategies.sql b/dbt/include/databricks/macros/materializations/incremental/strategies.sql new file mode 100644 index 000000000..876ecf4a0 --- /dev/null +++ b/dbt/include/databricks/macros/materializations/incremental/strategies.sql @@ -0,0 +1,15 @@ +{% macro databricks__get_incremental_default_sql(arg_dict) %} + {{ return(get_incremental_merge_sql(arg_dict)) }} +{% endmacro %} + +{% macro databricks__get_incremental_append_sql(arg_dict) %} + {% do return(get_insert_into_sql(arg_dict["temp_relation"], arg_dict["target_relation"])) %} +{% endmacro %} + +{% macro databricks__get_incremental_merge_sql(arg_dict) %} + {% do return(get_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], dest_columns=none, predicates=none)) %} +{% endmacro %} + +{% macro databricks__get_incremental_insert_overwrite_sql(arg_dict) %} + {% do return(get_insert_overwrite_sql(arg_dict["temp_relation"], arg_dict["target_relation"])) %} +{% endmacro %}