Skip to content

Commit

Permalink
Apply "Initial refactoring of incremental materialization" (#148)
Browse files Browse the repository at this point in the history
### Description

Applies "Initial refactoring of incremental materialization" (dbt-labs/dbt-core#5359).

Now it uses `adapter.get_incremental_strategy_macro` instead of dbt-spark's `dbt_spark_get_incremental_sql` macro to dispatch the incremental strategy macro. The overwritten `dbt_spark_get_incremental_sql` macro will not work anymore.

Co-authored-by: allisonwang-db <[email protected]>
  • Loading branch information
ueshin and allisonwang-db authored Aug 25, 2022
1 parent 44ec7d9 commit 74e5ce2
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 6 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
## dbt-databricks 1.3.0 (Release TBD)

### Under the hood
- Apply "Initial refactoring of incremental materialization" ([#148](https://github.com/databricks/dbt-databricks/pull/148))
- Now dbt-databricks uses `adapter.get_incremental_strategy_macro` instead of `dbt_spark_get_incremental_sql` macro to dispatch the incremental strategy macro. The overwritten `dbt_spark_get_incremental_sql` macro will not work anymore.

## dbt-databricks 1.2.1 (August 24, 2022)

### Features
Expand Down
3 changes: 3 additions & 0 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,9 @@ def _get_columns_for_catalog(self, relation: DatabricksRelation) -> Iterable[Dic
as_dict["column_type"] = as_dict.pop("dtype")
yield as_dict

def valid_incremental_strategies(self) -> List[str]:
return ["append", "merge", "insert_overwrite"]

@contextmanager
def _catalog(self, catalog: Optional[str]) -> Iterator[None]:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
{%- set grant_config = config.get('grants') -%}

{%- set file_format = dbt_spark_validate_get_file_format(raw_file_format) -%}
{%- set strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%}
{%- set incremental_strategy = dbt_spark_validate_get_incremental_strategy(raw_strategy, file_format) -%}

{%- set unique_key = config.get('unique_key', none) -%}
{%- set partition_by = config.get('partition_by', none) -%}
Expand All @@ -17,9 +17,8 @@

{% set target_relation = this %}
{% set existing_relation = load_relation(this) %}
{% set tmp_relation = make_temp_relation(this) %}

{% if strategy == 'insert_overwrite' and partition_by %}
{% if incremental_strategy == 'insert_overwrite' and partition_by %}
{% call statement() %}
set spark.sql.sources.partitionOverwriteMode = DYNAMIC
{% endcall %}
Expand All @@ -37,9 +36,12 @@
{% endif %}
{% set build_sql = create_table_as(False, target_relation, sql) %}
{% else %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% do process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% set build_sql = dbt_spark_get_incremental_sql(strategy, tmp_relation, target_relation, unique_key) %}
{% set temp_relation = make_temp_relation(this) %}
{% do run_query(create_table_as(True, temp_relation, sql)) %}
{% do process_schema_changes(on_schema_change, temp_relation, existing_relation) %}
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key }) %}
{% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %}
{% endif %}

{%- call statement('main') -%}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{% macro databricks__get_incremental_default_sql(arg_dict) %}
{{ return(get_incremental_merge_sql(arg_dict)) }}
{% endmacro %}

{% macro databricks__get_incremental_append_sql(arg_dict) %}
{% do return(get_insert_into_sql(arg_dict["temp_relation"], arg_dict["target_relation"])) %}
{% endmacro %}

{% macro databricks__get_incremental_merge_sql(arg_dict) %}
{% do return(get_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], dest_columns=none, predicates=none)) %}
{% endmacro %}

{% macro databricks__get_incremental_insert_overwrite_sql(arg_dict) %}
{% do return(get_insert_overwrite_sql(arg_dict["temp_relation"], arg_dict["target_relation"])) %}
{% endmacro %}

0 comments on commit 74e5ce2

Please sign in to comment.