From 0aa213f0aae81b586ee2fe3cf8d9d3ba47b63178 Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Tue, 7 Jun 2022 17:13:31 -0400 Subject: [PATCH 01/13] Initial refactoring of incremental materialization --- core/dbt/adapters/base/impl.py | 31 ++++++++++++ core/dbt/contracts/graph/model_config.py | 1 + .../models/incremental/incremental.sql | 8 +++- .../models/incremental/merge.sql | 12 ++--- .../models/incremental/strategies.sql | 48 +++++++++++++++++++ .../functional/artifacts/expected_manifest.py | 3 ++ tests/functional/list/test_list.py | 9 +++- .../materializations/test_incremental.py | 40 ++++++++++++++++ 8 files changed, 144 insertions(+), 8 deletions(-) create mode 100644 core/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql create mode 100644 tests/functional/materializations/test_incremental.py diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index aef00e80d4b..661abd0f32c 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -1125,6 +1125,37 @@ def get_rows_different_sql( return sql + def validate_incremental_strategy(self): + return ["append", "delete+insert"] + + @available.parse_none + def get_incremental_strategy_macro(self, strategy: str): + # Construct macro_name from strategy name + if strategy is None: + strategy = "default" + strategy = strategy.replace("+", "_") + macro_name = f"get_incremental_{strategy}_sql" + # get a macro and return it + manifest = self._macro_manifest + macro = manifest.find_macro_by_name(macro_name, self.config.project_name, None) + if not macro: + raise RuntimeException( + 'dbt could not find a macro with the name "{}" in {}'.format( + macro_name, self.config.project_name + ) + ) + + from dbt.context.providers import generate_runtime_macro_context + + macro_context = generate_runtime_macro_context( + macro=macro, + config=self.config, + manifest=manifest, # type: ignore[arg-type] + package_name=self.config.project_name, + ) + macro_function = MacroGenerator(macro, macro_context) + return macro_function + COLUMNS_EQUAL_SQL = """ with diff_count as ( diff --git a/core/dbt/contracts/graph/model_config.py b/core/dbt/contracts/graph/model_config.py index 10e84921272..770a0aa30e4 100644 --- a/core/dbt/contracts/graph/model_config.py +++ b/core/dbt/contracts/graph/model_config.py @@ -433,6 +433,7 @@ class NodeConfig(NodeAndTestConfig): # Note: if any new fields are added with MergeBehavior, also update the # 'mergebehavior' dictionary materialized: str = "view" + incremental_strategy: Optional[str] = None persist_docs: Dict[str, Any] = field(default_factory=dict) post_hook: List[Hook] = field( default_factory=list, diff --git a/core/dbt/include/global_project/macros/materializations/models/incremental/incremental.sql b/core/dbt/include/global_project/macros/materializations/models/incremental/incremental.sql index 1cb316b1ab0..df13b62c037 100644 --- a/core/dbt/include/global_project/macros/materializations/models/incremental/incremental.sql +++ b/core/dbt/include/global_project/macros/materializations/models/incremental/incremental.sql @@ -45,7 +45,13 @@ {% if not dest_columns %} {% set dest_columns = adapter.get_columns_in_relation(existing_relation) %} {% endif %} - {% set build_sql = get_delete_insert_merge_sql(target_relation, temp_relation, unique_key, dest_columns) %} + + {#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#} + {% set incremental_strategy = config.get('incremental_strategy', 'default') %} + {% set incremental_predicates = config.get('incremental_predicates', none) %} + {% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(incremental_strategy) %} + {% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'predicates': incremental_predicates }) %} + {% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %} {% endif %} diff --git a/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql b/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql index 563cae960e9..c5f2bb83bde 100644 --- a/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql +++ b/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql @@ -26,7 +26,7 @@ {% do predicates.append('FALSE') %} {% endif %} - {{ sql_header if sql_header is not none }} + {# sql_header if sql_header is not none #} merge into {{ target }} as DBT_INTERNAL_DEST using {{ source }} as DBT_INTERNAL_SOURCE @@ -75,7 +75,7 @@ ); {% endif %} - {% endif %} + {% endif %} insert into {{ target }} ({{ dest_cols_csv }}) ( @@ -86,16 +86,16 @@ {%- endmacro %} -{% macro get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header=false) -%} - {{ adapter.dispatch('get_insert_overwrite_merge_sql', 'dbt')(target, source, dest_columns, predicates, include_sql_header) }} +{% macro get_insert_overwrite_merge_sql(target, source, dest_columns, predicates) -%} + {{ adapter.dispatch('get_insert_overwrite_merge_sql', 'dbt')(target, source, dest_columns, predicates) }} {%- endmacro %} -{% macro default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) -%} +{% macro default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates) -%} {%- set predicates = [] if predicates is none else [] + predicates -%} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} {%- set sql_header = config.get('sql_header', none) -%} - {{ sql_header if sql_header is not none and include_sql_header }} + {{ sql_header if sql_header is not none }} merge into {{ target }} as DBT_INTERNAL_DEST using {{ source }} as DBT_INTERNAL_SOURCE diff --git a/core/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql b/core/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql new file mode 100644 index 00000000000..4719c134bdb --- /dev/null +++ b/core/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql @@ -0,0 +1,48 @@ +{% macro get_incremental_append_sql(arg_dict) %} + + {% do return(get_insert_into_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["dest_columns"])) %} + +{% endmacro %} + + +{# snowflake #} +{% macro get_incremental_delete_insert_sql(arg_dict) %} + + {% do return(get_delete_insert_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"])) %} + +{% endmacro %} + + +{# snowflake, bigquery, spark #} +{% macro get_incremental_merge_sql(arg_dict) %} + + {% do return(get_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"])) %} + +{% endmacro %} + + +{% macro get_incremental_insert_overwrite_sql(arg_dict) %} + + {% do return(get_insert_overwrite_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["dest_columns"], arg_dict["predicates"])) %} + +{% endmacro %} + + +{% macro get_incremental_default_sql(arg_dict) %} + + {% do return(get_incremental_append_sql(arg_dict)) %} + +{% endmacro %} + + +{% macro get_insert_into_sql(target_relation, temp_relation, dest_columns) %} + + {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + + insert into {{ target_relation }} ({{ dest_cols_csv }}) + ( + select {{ dest_cols_csv }} + from {{ temp_relation }} + ) + +{% endmacro %} diff --git a/tests/functional/artifacts/expected_manifest.py b/tests/functional/artifacts/expected_manifest.py index 5a5916d3ccf..69f6c0a3679 100644 --- a/tests/functional/artifacts/expected_manifest.py +++ b/tests/functional/artifacts/expected_manifest.py @@ -31,6 +31,7 @@ def get_rendered_model_config(**updates): "meta": {}, "unique_key": None, "grants": {}, + "incremental_strategy": None, } result.update(updates) return result @@ -59,6 +60,7 @@ def get_rendered_seed_config(**updates): "meta": {}, "unique_key": None, "grants": {}, + "incremental_strategy": None, } result.update(updates) return result @@ -91,6 +93,7 @@ def get_rendered_snapshot_config(**updates): "target_schema": None, "meta": {}, "grants": {}, + "incremental_strategy": None, } result.update(updates) return result diff --git a/tests/functional/list/test_list.py b/tests/functional/list/test_list.py index b360a6b4d50..75b8fe2439a 100644 --- a/tests/functional/list/test_list.py +++ b/tests/functional/list/test_list.py @@ -91,6 +91,7 @@ def expect_snapshot_output(self, project): "on_schema_change": "ignore", "meta": {}, "grants": {}, + "incremental_strategy": None, }, "unique_id": "snapshot.test.my_snapshot", "original_file_path": normalize("snapshots/snapshot.sql"), @@ -127,6 +128,7 @@ def expect_analyses_output(self): "meta": {}, "unique_key": None, "grants": {}, + "incremental_strategy": None, }, "unique_id": "analysis.test.a", "original_file_path": normalize("analyses/a.sql"), @@ -164,6 +166,7 @@ def expect_model_output(self): "alias": None, "meta": {}, "grants": {}, + "incremental_strategy": None, }, "original_file_path": normalize("models/ephemeral.sql"), "unique_id": "model.test.ephemeral", @@ -190,12 +193,12 @@ def expect_model_output(self): "full_refresh": None, "unique_key": None, "on_schema_change": "ignore", - "incremental_strategy": "delete+insert", "database": None, "schema": None, "alias": None, "meta": {}, "grants": {}, + "incremental_strategy": "delete+insert", }, "original_file_path": normalize("models/incremental.sql"), "unique_id": "model.test.incremental", @@ -224,6 +227,7 @@ def expect_model_output(self): "alias": None, "meta": {}, "grants": {}, + "incremental_strategy": None, }, "original_file_path": normalize("models/sub/inner.sql"), "unique_id": "model.test.inner", @@ -252,6 +256,7 @@ def expect_model_output(self): "alias": None, "meta": {}, "grants": {}, + "incremental_strategy": None, }, "original_file_path": normalize("models/outer.sql"), "unique_id": "model.test.outer", @@ -295,6 +300,7 @@ def expect_model_ephemeral_output(self): "alias": None, "meta": {}, "grants": {}, + "incremental_strategy": None, }, "unique_id": "model.test.ephemeral", "original_file_path": normalize("models/ephemeral.sql"), @@ -357,6 +363,7 @@ def expect_seed_output(self): "alias": None, "meta": {}, "grants": {}, + "incremental_strategy": None, }, "unique_id": "seed.test.seed", "original_file_path": normalize("seeds/seed.csv"), diff --git a/tests/functional/materializations/test_incremental.py b/tests/functional/materializations/test_incremental.py new file mode 100644 index 00000000000..a70da8cf7ba --- /dev/null +++ b/tests/functional/materializations/test_incremental.py @@ -0,0 +1,40 @@ +import pytest +from dbt.tests.util import run_dbt + + +my_model_sql = """ + select 1 as fun +""" + + +@pytest.fixture(scope="class") +def models(): + return {"my_model.sql": my_model_sql} + + +def test_basic(project): + results = run_dbt(["run"]) + assert len(results) == 1 + + macro_func = project.adapter.get_incremental_strategy_macro("default") + assert macro_func + assert type(macro_func).__name__ == "MacroGenerator" + + macro_func = project.adapter.get_incremental_strategy_macro("append") + assert macro_func + assert type(macro_func).__name__ == "MacroGenerator" + + macro_func = project.adapter.get_incremental_strategy_macro("delete+insert") + assert macro_func + assert type(macro_func).__name__ == "MacroGenerator" + + # Note: the following 2 strategies aren't actually supported on Postgres, + # so will probably need to be removed before final version. + # Leaving here while developing + macro_func = project.adapter.get_incremental_strategy_macro("merge") + assert macro_func + assert type(macro_func).__name__ == "MacroGenerator" + + macro_func = project.adapter.get_incremental_strategy_macro("insert_overwrite") + assert macro_func + assert type(macro_func).__name__ == "MacroGenerator" From 0dc34fd775f90d42f7ed556db1d2e33bf9183a8b Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Fri, 10 Jun 2022 10:56:54 -0400 Subject: [PATCH 02/13] Changie --- .changes/unreleased/Features-20220610-105647.yaml | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changes/unreleased/Features-20220610-105647.yaml diff --git a/.changes/unreleased/Features-20220610-105647.yaml b/.changes/unreleased/Features-20220610-105647.yaml new file mode 100644 index 00000000000..23b9836418c --- /dev/null +++ b/.changes/unreleased/Features-20220610-105647.yaml @@ -0,0 +1,7 @@ +kind: Features +body: Incremental materialization refactor and cleanup +time: 2022-06-10T10:56:47.226887-04:00 +custom: + Author: gshank + Issue: "5245" + PR: "5359" From 3b96f9e5378901cb71366fd015c73d9945f4e0dd Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Fri, 10 Jun 2022 16:25:37 -0400 Subject: [PATCH 03/13] Add adapter.dispatch calls to new macros --- .../models/incremental/strategies.sql | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/core/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql b/core/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql index 4719c134bdb..5226d01de16 100644 --- a/core/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql +++ b/core/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql @@ -1,5 +1,12 @@ {% macro get_incremental_append_sql(arg_dict) %} + {{ return(adapter.dispatch('get_incremental_append_sql', 'dbt')(arg_dict)) }} + +{% endmacro %} + + +{% macro default__get_incremental_append_sql(arg_dict) %} + {% do return(get_insert_into_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["dest_columns"])) %} {% endmacro %} @@ -8,6 +15,12 @@ {# snowflake #} {% macro get_incremental_delete_insert_sql(arg_dict) %} + {{ return(adapter.dispatch('get_incremental_delete_insert_sql', 'dbt')(arg_dict)) }} + +{% endmacro %} + +{% macro default__get_incremental_delete_insert_sql(arg_dict) %} + {% do return(get_delete_insert_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"])) %} {% endmacro %} @@ -16,6 +29,12 @@ {# snowflake, bigquery, spark #} {% macro get_incremental_merge_sql(arg_dict) %} + {{ return(adapter.dispatch('get_incremental_merge_sql', 'dbt')(arg_dict)) }} + +{% endmacro %} + +{% macro default__get_incremental_merge_sql(arg_dict) %} + {% do return(get_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"])) %} {% endmacro %} @@ -23,6 +42,12 @@ {% macro get_incremental_insert_overwrite_sql(arg_dict) %} + {{ return(adapter.dispatch('get_incremental_insert_overwrite_sql', 'dbt')(arg_dict)) }} + +{% endmacro %} + +{% macro default__get_incremental_insert_overwrite_sql(arg_dict) %} + {% do return(get_insert_overwrite_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["dest_columns"], arg_dict["predicates"])) %} {% endmacro %} @@ -30,6 +55,12 @@ {% macro get_incremental_default_sql(arg_dict) %} + {{ return(adapter.dispatch('get_incremental_default_sql', 'dbt')(arg_dict)) }} + +{% endmacro %} + +{% macro default__get_incremental_default_sql(arg_dict) %} + {% do return(get_incremental_append_sql(arg_dict)) %} {% endmacro %} From 98f7b083976b6188cfaf1a6085d1cee4fc1e11eb Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Mon, 13 Jun 2022 09:43:30 -0400 Subject: [PATCH 04/13] update default to check unique_id and use delete_insert strategy --- .../materializations/models/incremental/strategies.sql | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql b/core/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql index 5226d01de16..da37698c553 100644 --- a/core/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql +++ b/core/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql @@ -61,7 +61,11 @@ {% macro default__get_incremental_default_sql(arg_dict) %} - {% do return(get_incremental_append_sql(arg_dict)) %} + {% if arg_dict["unique_key"] %} + {% do return(get_incremental_delete_insert_sql(arg_dict)) %} + {% else %} + {% do return(get_incremental_append_sql(arg_dict)) %} + {% endif %} {% endmacro %} From 6ac14be79de05315cc8e464f1195d8b8253ce7f7 Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Mon, 13 Jun 2022 13:55:15 -0400 Subject: [PATCH 05/13] Add code to check valid incremental strategies --- core/dbt/adapters/base/impl.py | 17 +++++++++++++++-- .../materializations/test_incremental.py | 17 ++++++++--------- 2 files changed, 23 insertions(+), 11 deletions(-) diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index 661abd0f32c..4c084aca9fc 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -1125,7 +1125,7 @@ def get_rows_different_sql( return sql - def validate_incremental_strategy(self): + def valid_incremental_strategies(self): return ["append", "delete+insert"] @available.parse_none @@ -1133,6 +1133,16 @@ def get_incremental_strategy_macro(self, strategy: str): # Construct macro_name from strategy name if strategy is None: strategy = "default" + + # validate strategies for this adapter + valid_strategies = self.valid_incremental_strategies() + valid_strategies.append("default") + builtin_strategies = ["append", "delete+insert", "merge", "insert_overwrite"] + if strategy in builtin_strategies and strategy not in valid_strategies: + raise RuntimeException( + f"The incremental strategy '{strategy}' is not valid for this adapter" + ) + strategy = strategy.replace("+", "_") macro_name = f"get_incremental_{strategy}_sql" # get a macro and return it @@ -1140,7 +1150,7 @@ def get_incremental_strategy_macro(self, strategy: str): macro = manifest.find_macro_by_name(macro_name, self.config.project_name, None) if not macro: raise RuntimeException( - 'dbt could not find a macro with the name "{}" in {}'.format( + 'dbt could not find an incremental strategy macro with the name "{}" in {}'.format( macro_name, self.config.project_name ) ) @@ -1156,6 +1166,9 @@ def get_incremental_strategy_macro(self, strategy: str): macro_function = MacroGenerator(macro, macro_context) return macro_function + def validate_incremental_strategy(self): + return ["append", "delete+insert"] + COLUMNS_EQUAL_SQL = """ with diff_count as ( diff --git a/tests/functional/materializations/test_incremental.py b/tests/functional/materializations/test_incremental.py index a70da8cf7ba..11352bbd5dc 100644 --- a/tests/functional/materializations/test_incremental.py +++ b/tests/functional/materializations/test_incremental.py @@ -1,5 +1,6 @@ import pytest from dbt.tests.util import run_dbt +from dbt.exceptions import RuntimeException my_model_sql = """ @@ -28,13 +29,11 @@ def test_basic(project): assert macro_func assert type(macro_func).__name__ == "MacroGenerator" - # Note: the following 2 strategies aren't actually supported on Postgres, - # so will probably need to be removed before final version. - # Leaving here while developing - macro_func = project.adapter.get_incremental_strategy_macro("merge") - assert macro_func - assert type(macro_func).__name__ == "MacroGenerator" + # These two incremental strategies are not valid for Postgres + with pytest.raises(RuntimeException) as excinfo: + macro_func = project.adapter.get_incremental_strategy_macro("merge") + assert "merge" in str(excinfo.value) - macro_func = project.adapter.get_incremental_strategy_macro("insert_overwrite") - assert macro_func - assert type(macro_func).__name__ == "MacroGenerator" + with pytest.raises(RuntimeException) as excinfo: + macro_func = project.adapter.get_incremental_strategy_macro("insert_overwrite") + assert "insert_overwrite" in str(excinfo.value) From e4796a9378c18e7fef318aec9624a8906d74ce3b Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Mon, 13 Jun 2022 17:53:05 -0400 Subject: [PATCH 06/13] Create postgres default strategy macro --- .../materializations/models/incremental/strategies.sql | 6 +----- .../macros/materializations/incremental_strategies.sql | 9 +++++++++ 2 files changed, 10 insertions(+), 5 deletions(-) create mode 100644 plugins/postgres/dbt/include/postgres/macros/materializations/incremental_strategies.sql diff --git a/core/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql b/core/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql index da37698c553..5226d01de16 100644 --- a/core/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql +++ b/core/dbt/include/global_project/macros/materializations/models/incremental/strategies.sql @@ -61,11 +61,7 @@ {% macro default__get_incremental_default_sql(arg_dict) %} - {% if arg_dict["unique_key"] %} - {% do return(get_incremental_delete_insert_sql(arg_dict)) %} - {% else %} - {% do return(get_incremental_append_sql(arg_dict)) %} - {% endif %} + {% do return(get_incremental_append_sql(arg_dict)) %} {% endmacro %} diff --git a/plugins/postgres/dbt/include/postgres/macros/materializations/incremental_strategies.sql b/plugins/postgres/dbt/include/postgres/macros/materializations/incremental_strategies.sql new file mode 100644 index 00000000000..f2fbf41e090 --- /dev/null +++ b/plugins/postgres/dbt/include/postgres/macros/materializations/incremental_strategies.sql @@ -0,0 +1,9 @@ +{% macro postgres__get_incremental_default_sql(arg_dict) %} + + {% if arg_dict["unique_key"] %} + {% do return(get_incremental_delete_insert_sql(arg_dict)) %} + {% else %} + {% do return(get_incremental_append_sql(arg_dict)) %} + {% endif %} + +{% endmacro %} From ae691c06867c0961d05e9ccfc4b5887de1170267 Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Mon, 18 Jul 2022 09:02:06 -0400 Subject: [PATCH 07/13] Set 'default' for incremental_strategy --- .../macros/materializations/models/incremental/incremental.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/dbt/include/global_project/macros/materializations/models/incremental/incremental.sql b/core/dbt/include/global_project/macros/materializations/models/incremental/incremental.sql index 01bcc9a9fa7..679176fb29c 100644 --- a/core/dbt/include/global_project/macros/materializations/models/incremental/incremental.sql +++ b/core/dbt/include/global_project/macros/materializations/models/incremental/incremental.sql @@ -49,7 +49,7 @@ {% endif %} {#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#} - {% set incremental_strategy = config.get('incremental_strategy', 'default') %} + {% set incremental_strategy = config.get('incremental_strategy') or 'default' %} {% set incremental_predicates = config.get('incremental_predicates', none) %} {% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(incremental_strategy) %} {% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'predicates': incremental_predicates }) %} From ee002a961526068441c33596078d721cf086eefc Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Tue, 19 Jul 2022 12:25:05 -0400 Subject: [PATCH 08/13] Uncomment sql_header and use config to get include_sql_header --- .../macros/materializations/models/incremental/merge.sql | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql b/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql index c5f2bb83bde..c33c673f70c 100644 --- a/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql +++ b/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql @@ -26,7 +26,7 @@ {% do predicates.append('FALSE') %} {% endif %} - {# sql_header if sql_header is not none #} + {{ sql_header if sql_header is not none }} merge into {{ target }} as DBT_INTERNAL_DEST using {{ source }} as DBT_INTERNAL_SOURCE @@ -94,8 +94,10 @@ {%- set predicates = [] if predicates is none else [] + predicates -%} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} {%- set sql_header = config.get('sql_header', none) -%} + {# The following is only true in BigQuery #} + {%- set include_sql_header = config.get('include_sql_header', false) -%} - {{ sql_header if sql_header is not none }} + {{ sql_header if sql_header is not none and include_sql_header }} merge into {{ target }} as DBT_INTERNAL_DEST using {{ source }} as DBT_INTERNAL_SOURCE From e9e811eb99e736c7028f4d52db4b9da8c860e17a Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Tue, 19 Jul 2022 13:29:27 -0400 Subject: [PATCH 09/13] remove stray method --- core/dbt/adapters/base/impl.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index 916f7563b61..9c3af8356c8 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -1203,9 +1203,6 @@ def get_incremental_strategy_macro(self, strategy: str): macro_function = MacroGenerator(macro, macro_context) return macro_function - def validate_incremental_strategy(self): - return ["append", "delete+insert"] - COLUMNS_EQUAL_SQL = """ with diff_count as ( From c571130c5ae4d59a4b0ccf70fa634631222bb5a7 Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Wed, 20 Jul 2022 19:34:34 -0400 Subject: [PATCH 10/13] Pass model context to get_incremental_strategy_macro method --- core/dbt/adapters/base/impl.py | 20 ++++----------- .../models/incremental/incremental.sql | 2 +- .../materializations/test_incremental.py | 25 ++++++++++++++----- 3 files changed, 25 insertions(+), 22 deletions(-) diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index 9c3af8356c8..651a0cb42c7 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -1166,7 +1166,7 @@ def valid_incremental_strategies(self): return ["append", "delete+insert"] @available.parse_none - def get_incremental_strategy_macro(self, strategy: str): + def get_incremental_strategy_macro(self, model_context, strategy: str): # Construct macro_name from strategy name if strategy is None: strategy = "default" @@ -1182,26 +1182,16 @@ def get_incremental_strategy_macro(self, strategy: str): strategy = strategy.replace("+", "_") macro_name = f"get_incremental_{strategy}_sql" - # get a macro and return it - manifest = self._macro_manifest - macro = manifest.find_macro_by_name(macro_name, self.config.project_name, None) - if not macro: + # The model_context should have MacroGenerator callable objects for all macros + if macro_name not in model_context: raise RuntimeException( 'dbt could not find an incremental strategy macro with the name "{}" in {}'.format( macro_name, self.config.project_name ) ) - from dbt.context.providers import generate_runtime_macro_context - - macro_context = generate_runtime_macro_context( - macro=macro, - config=self.config, - manifest=manifest, # type: ignore[arg-type] - package_name=self.config.project_name, - ) - macro_function = MacroGenerator(macro, macro_context) - return macro_function + # This returns a callable macro + return model_context[macro_name] COLUMNS_EQUAL_SQL = """ diff --git a/core/dbt/include/global_project/macros/materializations/models/incremental/incremental.sql b/core/dbt/include/global_project/macros/materializations/models/incremental/incremental.sql index 679176fb29c..602067616d2 100644 --- a/core/dbt/include/global_project/macros/materializations/models/incremental/incremental.sql +++ b/core/dbt/include/global_project/macros/materializations/models/incremental/incremental.sql @@ -51,7 +51,7 @@ {#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#} {% set incremental_strategy = config.get('incremental_strategy') or 'default' %} {% set incremental_predicates = config.get('incremental_predicates', none) %} - {% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(incremental_strategy) %} + {% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %} {% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'predicates': incremental_predicates }) %} {% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %} diff --git a/tests/functional/materializations/test_incremental.py b/tests/functional/materializations/test_incremental.py index 11352bbd5dc..361ae8193d0 100644 --- a/tests/functional/materializations/test_incremental.py +++ b/tests/functional/materializations/test_incremental.py @@ -1,6 +1,7 @@ import pytest -from dbt.tests.util import run_dbt +from dbt.tests.util import run_dbt, get_manifest from dbt.exceptions import RuntimeException +from dbt.context.providers import generate_runtime_model_context my_model_sql = """ @@ -17,23 +18,35 @@ def test_basic(project): results = run_dbt(["run"]) assert len(results) == 1 - macro_func = project.adapter.get_incremental_strategy_macro("default") + manifest = get_manifest(project.project_root) + model = manifest.nodes['model.test.my_model'] + + # Normally the context will be provided by the macro that calls the + # get_incrmental_strategy_macro method, but for testing purposes + # we create a runtime_model_context. + context = generate_runtime_model_context( + model, + project.adapter.config, + manifest, + ) + + macro_func = project.adapter.get_incremental_strategy_macro(context, "default") assert macro_func assert type(macro_func).__name__ == "MacroGenerator" - macro_func = project.adapter.get_incremental_strategy_macro("append") + macro_func = project.adapter.get_incremental_strategy_macro(context, "append") assert macro_func assert type(macro_func).__name__ == "MacroGenerator" - macro_func = project.adapter.get_incremental_strategy_macro("delete+insert") + macro_func = project.adapter.get_incremental_strategy_macro(context, "delete+insert") assert macro_func assert type(macro_func).__name__ == "MacroGenerator" # These two incremental strategies are not valid for Postgres with pytest.raises(RuntimeException) as excinfo: - macro_func = project.adapter.get_incremental_strategy_macro("merge") + macro_func = project.adapter.get_incremental_strategy_macro(context, "merge") assert "merge" in str(excinfo.value) with pytest.raises(RuntimeException) as excinfo: - macro_func = project.adapter.get_incremental_strategy_macro("insert_overwrite") + macro_func = project.adapter.get_incremental_strategy_macro(context, "insert_overwrite") assert "insert_overwrite" in str(excinfo.value) From ea7038a1661a08694519121e68c97ce32a1432f0 Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Wed, 20 Jul 2022 20:06:07 -0400 Subject: [PATCH 11/13] Various tweaks and comments --- core/dbt/adapters/base/impl.py | 8 +++++++- .../macros/materializations/models/incremental/merge.sql | 3 ++- tests/functional/materializations/test_incremental.py | 2 +- 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index 651a0cb42c7..43a2ebb5ec8 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -1163,8 +1163,14 @@ def get_rows_different_sql( return sql def valid_incremental_strategies(self): + """The set of standard builtin strategies which this adapter supports out-of-the-box. + Not used to validate custom strategies defined by end users. + """ return ["append", "delete+insert"] + def builtin_incremental_strategies(self): + return ["append", "delete+insert", "merge", "insert_overwrite"] + @available.parse_none def get_incremental_strategy_macro(self, model_context, strategy: str): # Construct macro_name from strategy name @@ -1174,7 +1180,7 @@ def get_incremental_strategy_macro(self, model_context, strategy: str): # validate strategies for this adapter valid_strategies = self.valid_incremental_strategies() valid_strategies.append("default") - builtin_strategies = ["append", "delete+insert", "merge", "insert_overwrite"] + builtin_strategies = self.builtin_incremental_strategies() if strategy in builtin_strategies and strategy not in valid_strategies: raise RuntimeException( f"The incremental strategy '{strategy}' is not valid for this adapter" diff --git a/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql b/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql index c33c673f70c..bf4e2ef86f8 100644 --- a/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql +++ b/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql @@ -94,7 +94,8 @@ {%- set predicates = [] if predicates is none else [] + predicates -%} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} {%- set sql_header = config.get('sql_header', none) -%} - {# The following is only true in BigQuery #} + {#-- The only time include_sql_header = True: --#} + {#-- BigQuery + insert_overwrite strategy + "static" partitions config --#} {%- set include_sql_header = config.get('include_sql_header', false) -%} {{ sql_header if sql_header is not none and include_sql_header }} diff --git a/tests/functional/materializations/test_incremental.py b/tests/functional/materializations/test_incremental.py index 361ae8193d0..f6ec8b2a3e9 100644 --- a/tests/functional/materializations/test_incremental.py +++ b/tests/functional/materializations/test_incremental.py @@ -19,7 +19,7 @@ def test_basic(project): assert len(results) == 1 manifest = get_manifest(project.project_root) - model = manifest.nodes['model.test.my_model'] + model = manifest.nodes["model.test.my_model"] # Normally the context will be provided by the macro that calls the # get_incrmental_strategy_macro method, but for testing purposes From 69d0e8f3c47b0950c7a7434a18f1467c120f2e74 Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Thu, 21 Jul 2022 09:17:23 -0400 Subject: [PATCH 12/13] Change default valid_incremental_strategies in base and postgres --- core/dbt/adapters/base/impl.py | 2 +- plugins/postgres/dbt/adapters/postgres/impl.py | 6 ++++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/core/dbt/adapters/base/impl.py b/core/dbt/adapters/base/impl.py index 43a2ebb5ec8..0e8bb322f58 100644 --- a/core/dbt/adapters/base/impl.py +++ b/core/dbt/adapters/base/impl.py @@ -1166,7 +1166,7 @@ def valid_incremental_strategies(self): """The set of standard builtin strategies which this adapter supports out-of-the-box. Not used to validate custom strategies defined by end users. """ - return ["append", "delete+insert"] + return ["append"] def builtin_incremental_strategies(self): return ["append", "delete+insert", "merge", "insert_overwrite"] diff --git a/plugins/postgres/dbt/adapters/postgres/impl.py b/plugins/postgres/dbt/adapters/postgres/impl.py index 116ec5e7727..3664e8d2a51 100644 --- a/plugins/postgres/dbt/adapters/postgres/impl.py +++ b/plugins/postgres/dbt/adapters/postgres/impl.py @@ -132,3 +132,9 @@ def _relations_cache_for_schemas(self, manifest, cache_schemas=None): def timestamp_add_sql(self, add_to: str, number: int = 1, interval: str = "hour") -> str: return f"{add_to} + interval '{number} {interval}'" + + def valid_incremental_strategies(self): + """The set of standard builtin strategies which this adapter supports out-of-the-box. + Not used to validate custom strategies defined by end users. + """ + return ["append", "delete+insert"] From 1bea247eb67cb2d86d31e0178527d0a7f1ff887b Mon Sep 17 00:00:00 2001 From: Gerda Shank Date: Thu, 21 Jul 2022 13:36:45 -0400 Subject: [PATCH 13/13] Put back include_sql_header param --- .../materializations/models/incremental/merge.sql | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql b/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql index bf4e2ef86f8..713f02ccc8e 100644 --- a/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql +++ b/core/dbt/include/global_project/macros/materializations/models/incremental/merge.sql @@ -86,17 +86,18 @@ {%- endmacro %} -{% macro get_insert_overwrite_merge_sql(target, source, dest_columns, predicates) -%} - {{ adapter.dispatch('get_insert_overwrite_merge_sql', 'dbt')(target, source, dest_columns, predicates) }} +{% macro get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header=false) -%} + {{ adapter.dispatch('get_insert_overwrite_merge_sql', 'dbt')(target, source, dest_columns, predicates, include_sql_header) }} {%- endmacro %} -{% macro default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates) -%} +{% macro default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) -%} + {#-- The only time include_sql_header is True: --#} + {#-- BigQuery + insert_overwrite strategy + "static" partitions config --#} + {#-- We should consider including the sql header at the materialization level instead --#} + {%- set predicates = [] if predicates is none else [] + predicates -%} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} {%- set sql_header = config.get('sql_header', none) -%} - {#-- The only time include_sql_header = True: --#} - {#-- BigQuery + insert_overwrite strategy + "static" partitions config --#} - {%- set include_sql_header = config.get('include_sql_header', false) -%} {{ sql_header if sql_header is not none and include_sql_header }}