From e0a786b21301b873b1b9425962b777f3c3ccd10a Mon Sep 17 00:00:00 2001 From: Peter Allen Webb Date: Wed, 18 Dec 2024 20:30:39 -0500 Subject: [PATCH] Ensure new 'deletion' records get a new scd_id --- .../simple_snapshot/new_record_mode.py | 22 +++++++++++++++++-- .../materializations/snapshots/helpers.sql | 18 ++++++++++++--- 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py b/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py index f2f0f3b5..685fabd7 100644 --- a/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py +++ b/dbt-tests-adapter/dbt/tests/adapter/simple_snapshot/new_record_mode.py @@ -2,6 +2,7 @@ from dbt.tests.util import check_relations_equal, run_dbt +# Snapshot source data for the tests in this file _seed_new_record_mode = """ BEGIN @@ -176,8 +177,14 @@ where id >= 10 and id <= 20; """ +# SQL to delete a record from the snapshot source data _delete_sql = """ -delete from {schema}.seed where id = 1 +delete from {database}.{schema}.seed where id = 1 +""" + +# If the deletion worked correctly, this should return two rows, with one of them representing the deletion. +_delete_check_sql = """ +select dbt_valid_to, dbt_scd_id, dbt_is_deleted from {schema}.snapshot_actual where id = 1 """ @@ -229,4 +236,15 @@ def test_snapshot_new_record_mode( results = run_dbt(["snapshot"]) assert len(results) == 1 - # TODO: Further validate results. + check_result = project.run_sql(_delete_check_sql, fetch="all") + valid_to = 0 + scd_id = 1 + is_deleted = 2 + assert len(check_result) == 2 + assert sum( + [1 for c in check_result if c[valid_to] is None and c[scd_id] is not None and c[is_deleted] == "True"] + ) == 1 + assert sum( + [1 for c in check_result if c[valid_to] is not None and c[scd_id] is not None and c[is_deleted] == "False"] + ) == 1 + assert check_result[0][scd_id] != check_result[1][scd_id] \ No newline at end of file diff --git a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql index 33492cc9..6454877c 100644 --- a/dbt/include/global_project/macros/materializations/snapshots/helpers.sql +++ b/dbt/include/global_project/macros/materializations/snapshots/helpers.sql @@ -40,7 +40,7 @@ {% macro default__snapshot_staging_table(strategy, source_sql, target_relation) -%} {% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %} - + {% set new_scd_id = snapshot_hash_arguments([columns.dbt_scd_id, snapshot_get_time()]) %} with snapshot_query as ( {{ source_sql }} @@ -169,12 +169,13 @@ {{ snapshot_get_time() }} as {{ columns.dbt_valid_from }}, {{ snapshot_get_time() }} as {{ columns.dbt_updated_at }}, snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }}, - snapshotted_data.{{ columns.dbt_scd_id }}, + {{ new_scd_id }} as {{ columns.dbt_scd_id }}, 'True' as {{ columns.dbt_is_deleted }} from snapshotted_data left join deletes_source_data as source_data on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }} - where {{ unique_key_is_null(strategy.unique_key, "source_data") }} + where {{ unique_key_is_null(strategy.unique_key, "source_data") }} + ) {%- endif %} @@ -272,6 +273,17 @@ {% endif %} {% endmacro %} +{% macro unique_key_reverse(unique_key) %} + {% if unique_key | is_list %} + {% for key in unique_key %} + dbt_unique_key_{{ loop.index }} as {{ key }} + {%- if not loop.last %} , {%- endif %} + {% endfor %} + {% else %} + dbt_unique_key as {{ unique_key }} + {% endif %} +{% endmacro %} + {% macro unique_key_join_on(unique_key, identifier, from_identifier) %} {% if unique_key | is_list %}