Skip to content

Commit

Permalink
Fix insert_overwrite replacement edge case (#697)
Browse files Browse the repository at this point in the history
  • Loading branch information
benc-db authored Jun 7, 2024
1 parent 7fd525c commit f84a420
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Fixes

- Undo the removal of spark.sql.sources.partitionOverwriteMode = DYNAMIC ([688](https://github.com/databricks/dbt-databricks/pull/688))
- Set spark.sql.sources.partitionOverwriteMode = STATIC on --full-refresh to ensure existing rows are removed ([697](https://github.com/databricks/dbt-databricks/pull/697))
- Migrate to using system.information_schema to fix issue with catalog renames ([692](https://github.com/databricks/dbt-databricks/pull/692))
- Cancel python jobs when dbt operation is canceled (thanks @gaoshihang for kicking this off!) ([693](https://github.com/databricks/dbt-databricks/pull/693))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
{%- set existing_relation = adapter.get_relation(database=this.database, schema=this.schema, identifier=this.identifier, needs_information=True) -%}


{#-- Set Overwrite Mode - does not yet work for warehouses --#}
{%- if incremental_strategy == 'insert_overwrite' and partition_by -%}
{#-- Set Overwrite Mode to STATIC for initial replace --#}
{%- if incremental_strategy == 'insert_overwrite' and should_full_refresh() -%}
{%- call statement() -%}
set spark.sql.sources.partitionOverwriteMode = DYNAMIC
set spark.sql.sources.partitionOverwriteMode = STATIC
{%- endcall -%}
{%- endif -%}

Expand Down Expand Up @@ -53,6 +53,12 @@
{% endif %}
{% do apply_tags(target_relation, tags) %}
{%- else -%}
{#-- Set Overwrite Mode to DYNAMIC for subsequent incremental operations --#}
{%- if incremental_strategy == 'insert_overwrite' and partition_by -%}
{%- call statement() -%}
set spark.sql.sources.partitionOverwriteMode = DYNAMIC
{%- endcall -%}
{%- endif -%}
{#-- Relation must be merged --#}
{%- set _existing_config = adapter.get_relation_config(existing_relation) -%}
{%- set model_config = adapter.get_config_from_model(config.model) -%}
Expand Down
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@


def pytest_addoption(parser):
parser.addoption("--profile", action="store", default="databricks_uc_sql_endpoint", type=str)
parser.addoption("--profile", action="store", default="databricks_uc_cluster", type=str)


# Using @pytest.mark.skip_profile('databricks_cluster') uses the 'skip_by_adapter_type'
Expand Down
27 changes: 27 additions & 0 deletions tests/functional/adapter/incremental/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,30 @@ def model(dbt, spark):
liquid_clustered_by: test2
http_path: "{{ env_var('DBT_DATABRICKS_UC_CLUSTER_HTTP_PATH') }}"
"""

replace_table = """
{{ config(
materialized = 'table'
) }}
select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color
union all
select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color
"""

replace_incremental = """
{{ config(
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
partition_by = 'id'
) }}
select cast(1 as bigint) as id, 'hello' as msg, 'blue' as color
union all
select cast(2 as bigint) as id, 'goodbye' as msg, 'red' as color
"""

replace_expected = """id,msg,color
1,hello,blue
2,goodbye,red
"""
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import pytest
from dbt.tests import util
from tests.functional.adapter.incremental import fixtures


@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_sql_endpoint")
class TestIncrementalReplaceTable:
@pytest.fixture(scope="class")
def models(self):
return {"model.sql": fixtures.replace_table}

@pytest.fixture(scope="class")
def seeds(self):
return {"seed.csv": fixtures.replace_expected}

# Validate that when we replace an existing table, no extra partitions are left behind
def test_replace(self, project):
util.run_dbt(["build"])
util.write_file(fixtures.replace_incremental, "models", "model.sql")
util.run_dbt(["run", "--full-refresh"])
util.check_relations_equal(project.adapter, ["model", "seed"])

0 comments on commit f84a420

Please sign in to comment.