diff --git a/CHANGELOG.md b/CHANGELOG.md index 46c862ca5..8c10852b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### Fixes - Undo the removal of spark.sql.sources.partitionOverwriteMode = DYNAMIC ([688](https://github.com/databricks/dbt-databricks/pull/688)) +- Set spark.sql.sources.partitionOverwriteMode = STATIC on --full-refresh to ensure existing rows are removed ([697](https://github.com/databricks/dbt-databricks/pull/697)) - Migrate to using system.information_schema to fix issue with catalog renames ([692](https://github.com/databricks/dbt-databricks/pull/692)) - Cancel python jobs when dbt operation is canceled (thanks @gaoshihang for kicking this off!) ([693](https://github.com/databricks/dbt-databricks/pull/693)) diff --git a/dbt/include/databricks/macros/materializations/incremental/incremental.sql b/dbt/include/databricks/macros/materializations/incremental/incremental.sql index 05f00a83e..46db1fbdd 100644 --- a/dbt/include/databricks/macros/materializations/incremental/incremental.sql +++ b/dbt/include/databricks/macros/materializations/incremental/incremental.sql @@ -20,10 +20,10 @@ {%- set existing_relation = adapter.get_relation(database=this.database, schema=this.schema, identifier=this.identifier, needs_information=True) -%} - {#-- Set Overwrite Mode - does not yet work for warehouses --#} - {%- if incremental_strategy == 'insert_overwrite' and partition_by -%} + {#-- Set Overwrite Mode to STATIC for initial replace --#} + {%- if incremental_strategy == 'insert_overwrite' and should_full_refresh() -%} {%- call statement() -%} - set spark.sql.sources.partitionOverwriteMode = DYNAMIC + set spark.sql.sources.partitionOverwriteMode = STATIC {%- endcall -%} {%- endif -%} @@ -53,6 +53,12 @@ {% endif %} {% do apply_tags(target_relation, tags) %} {%- else -%} + {#-- Set Overwrite Mode to DYNAMIC for subsequent incremental operations --#} + {%- if incremental_strategy == 'insert_overwrite' and partition_by -%} + {%- call statement() -%} + set spark.sql.sources.partitionOverwriteMode = DYNAMIC + {%- endcall -%} + {%- endif -%} {#-- Relation must be merged --#} {%- set _existing_config = adapter.get_relation_config(existing_relation) -%} {%- set model_config = adapter.get_config_from_model(config.model) -%} diff --git a/tests/conftest.py b/tests/conftest.py index b8a0e0775..a6b572116 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -7,7 +7,7 @@ def pytest_addoption(parser): - parser.addoption("--profile", action="store", default="databricks_uc_sql_endpoint", type=str) + parser.addoption("--profile", action="store", default="databricks_uc_cluster", type=str) # Using @pytest.mark.skip_profile('databricks_cluster') uses the 'skip_by_adapter_type' diff --git a/tests/functional/adapter/incremental/fixtures.py b/tests/functional/adapter/incremental/fixtures.py index f6b418d3a..0068d97b2 100644 --- a/tests/functional/adapter/incremental/fixtures.py +++ b/tests/functional/adapter/incremental/fixtures.py @@ -328,3 +328,30 @@ def model(dbt, spark): liquid_clustered_by: test2 http_path: "{{ env_var('DBT_DATABRICKS_UC_CLUSTER_HTTP_PATH') }}" """ + +replace_table = """ +{{ config( + materialized = 'table' +) }} + +select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color +union all +select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color +""" + +replace_incremental = """ +{{ config( + materialized = 'incremental', + incremental_strategy = 'insert_overwrite', + partition_by = 'id' +) }} + +select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color +union all +select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color +""" + +replace_expected = """id,msg,color +1,hello,blue +2,goodbye,red +""" diff --git a/tests/functional/adapter/incremental/test_incremental_replace_table.py b/tests/functional/adapter/incremental/test_incremental_replace_table.py new file mode 100644 index 000000000..ec7f351a8 --- /dev/null +++ b/tests/functional/adapter/incremental/test_incremental_replace_table.py @@ -0,0 +1,21 @@ +import pytest +from dbt.tests import util +from tests.functional.adapter.incremental import fixtures + + +@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_sql_endpoint") +class TestIncrementalReplaceTable: + @pytest.fixture(scope="class") + def models(self): + return {"model.sql": fixtures.replace_table} + + @pytest.fixture(scope="class") + def seeds(self): + return {"seed.csv": fixtures.replace_expected} + + # Validate that when we replace an existing table, no extra partitions are left behind + def test_replace(self, project): + util.run_dbt(["build"]) + util.write_file(fixtures.replace_incremental, "models", "model.sql") + util.run_dbt(["run", "--full-refresh"]) + util.check_relations_equal(project.adapter, ["model", "seed"])