Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial refactoring of incremental materialization #5359

Merged
merged 15 commits into from
Jul 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changes/unreleased/Features-20220610-105647.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: Incremental materialization refactor and cleanup
time: 2022-06-10T10:56:47.226887-04:00
custom:
Author: gshank
Issue: "5245"
PR: "5359"
37 changes: 37 additions & 0 deletions core/dbt/adapters/base/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -1162,6 +1162,43 @@ def get_rows_different_sql(

return sql

def valid_incremental_strategies(self):
"""The set of standard builtin strategies which this adapter supports out-of-the-box.
Not used to validate custom strategies defined by end users.
"""
return ["append"]

def builtin_incremental_strategies(self):
return ["append", "delete+insert", "merge", "insert_overwrite"]

@available.parse_none
def get_incremental_strategy_macro(self, model_context, strategy: str):
# Construct macro_name from strategy name
if strategy is None:
strategy = "default"
gshank marked this conversation as resolved.
Show resolved Hide resolved

# validate strategies for this adapter
valid_strategies = self.valid_incremental_strategies()
valid_strategies.append("default")
builtin_strategies = self.builtin_incremental_strategies()
if strategy in builtin_strategies and strategy not in valid_strategies:
raise RuntimeException(
f"The incremental strategy '{strategy}' is not valid for this adapter"
)

strategy = strategy.replace("+", "_")
macro_name = f"get_incremental_{strategy}_sql"
# The model_context should have MacroGenerator callable objects for all macros
if macro_name not in model_context:
raise RuntimeException(
'dbt could not find an incremental strategy macro with the name "{}" in {}'.format(
macro_name, self.config.project_name
)
)

# This returns a callable macro
return model_context[macro_name]


COLUMNS_EQUAL_SQL = """
with diff_count as (
Expand Down
1 change: 1 addition & 0 deletions core/dbt/contracts/graph/model_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ class NodeConfig(NodeAndTestConfig):
# Note: if any new fields are added with MergeBehavior, also update the
# 'mergebehavior' dictionary
materialized: str = "view"
incremental_strategy: Optional[str] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gshank @nathaniel-may does this now mean that the value of incremental_strategy is now in the manifest.json when it wasn't before? or does this add it to the python context such that it is accessible as an attribute of model as in model.incremental_strategy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means that it should be in the manifest.json. It's accessible like other config keys, but the behavior is a bit different for builtin attributes than for adhoc attributes, in that setting defaults doesn't work the same way.

persist_docs: Dict[str, Any] = field(default_factory=dict)
post_hook: List[Hook] = field(
default_factory=list,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,13 @@
{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}
{% set build_sql = get_delete_insert_merge_sql(target_relation, temp_relation, unique_key, dest_columns) %}

{#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#}
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
{% set incremental_predicates = config.get('incremental_predicates', none) %}
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'predicates': incremental_predicates }) %}
{% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %}

{% endif %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
);

{% endif %}
{% endif %}
{% endif %}

insert into {{ target }} ({{ dest_cols_csv }})
(
Expand All @@ -91,6 +91,10 @@
{%- endmacro %}

{% macro default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates, include_sql_header) -%}
{#-- The only time include_sql_header is True: --#}
{#-- BigQuery + insert_overwrite strategy + "static" partitions config --#}
{#-- We should consider including the sql header at the materialization level instead --#}

{%- set predicates = [] if predicates is none else [] + predicates -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{%- set sql_header = config.get('sql_header', none) -%}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
{% macro get_incremental_append_sql(arg_dict) %}

{{ return(adapter.dispatch('get_incremental_append_sql', 'dbt')(arg_dict)) }}

{% endmacro %}


{% macro default__get_incremental_append_sql(arg_dict) %}

{% do return(get_insert_into_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["dest_columns"])) %}
jtcohen6 marked this conversation as resolved.
Show resolved Hide resolved

{% endmacro %}


{# snowflake #}
{% macro get_incremental_delete_insert_sql(arg_dict) %}

{{ return(adapter.dispatch('get_incremental_delete_insert_sql', 'dbt')(arg_dict)) }}

{% endmacro %}

{% macro default__get_incremental_delete_insert_sql(arg_dict) %}

{% do return(get_delete_insert_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"])) %}

{% endmacro %}


{# snowflake, bigquery, spark #}
{% macro get_incremental_merge_sql(arg_dict) %}

{{ return(adapter.dispatch('get_incremental_merge_sql', 'dbt')(arg_dict)) }}

{% endmacro %}

{% macro default__get_incremental_merge_sql(arg_dict) %}

{% do return(get_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["unique_key"], arg_dict["dest_columns"])) %}

{% endmacro %}
jtcohen6 marked this conversation as resolved.
Show resolved Hide resolved


{% macro get_incremental_insert_overwrite_sql(arg_dict) %}

{{ return(adapter.dispatch('get_incremental_insert_overwrite_sql', 'dbt')(arg_dict)) }}

{% endmacro %}

{% macro default__get_incremental_insert_overwrite_sql(arg_dict) %}

{% do return(get_insert_overwrite_merge_sql(arg_dict["target_relation"], arg_dict["temp_relation"], arg_dict["dest_columns"], arg_dict["predicates"])) %}

{% endmacro %}


{% macro get_incremental_default_sql(arg_dict) %}

{{ return(adapter.dispatch('get_incremental_default_sql', 'dbt')(arg_dict)) }}

{% endmacro %}

{% macro default__get_incremental_default_sql(arg_dict) %}

{% do return(get_incremental_append_sql(arg_dict)) %}

{% endmacro %}
Comment on lines +62 to +66
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This represents an implicit breaking change for maintainers of existing adapter plugins who use the default incremental materialization:

  • Previously, the (implicit) default was delete+insert
  • Now, the explicit default is append

I think it's a good change! It's just one we'll want to document very clearly

(cc @dataders)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opened a docs issue: dbt-labs/docs.getdbt.com#1761

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes in incremental.sql made me wonder the same thing about default behavior.



{% macro get_insert_into_sql(target_relation, temp_relation, dest_columns) %}
jtcohen6 marked this conversation as resolved.
Show resolved Hide resolved

{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ temp_relation }}
)

{% endmacro %}
6 changes: 6 additions & 0 deletions plugins/postgres/dbt/adapters/postgres/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,9 @@ def _relations_cache_for_schemas(self, manifest, cache_schemas=None):

def timestamp_add_sql(self, add_to: str, number: int = 1, interval: str = "hour") -> str:
return f"{add_to} + interval '{number} {interval}'"

def valid_incremental_strategies(self):
"""The set of standard builtin strategies which this adapter supports out-of-the-box.
Not used to validate custom strategies defined by end users.
"""
return ["append", "delete+insert"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
{% macro postgres__get_incremental_default_sql(arg_dict) %}

{% if arg_dict["unique_key"] %}
{% do return(get_incremental_delete_insert_sql(arg_dict)) %}
{% else %}
{% do return(get_incremental_append_sql(arg_dict)) %}
{% endif %}

{% endmacro %}
Comment on lines +1 to +9
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Love to see this!!

3 changes: 3 additions & 0 deletions tests/functional/artifacts/expected_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def get_rendered_model_config(**updates):
"meta": {},
"unique_key": None,
"grants": {},
"incremental_strategy": None,
jtcohen6 marked this conversation as resolved.
Show resolved Hide resolved
}
result.update(updates)
return result
Expand Down Expand Up @@ -59,6 +60,7 @@ def get_rendered_seed_config(**updates):
"meta": {},
"unique_key": None,
"grants": {},
"incremental_strategy": None,
}
result.update(updates)
return result
Expand Down Expand Up @@ -91,6 +93,7 @@ def get_rendered_snapshot_config(**updates):
"target_schema": None,
"meta": {},
"grants": {},
"incremental_strategy": None,
}
result.update(updates)
return result
Expand Down
9 changes: 8 additions & 1 deletion tests/functional/list/test_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ def expect_snapshot_output(self, project):
"on_schema_change": "ignore",
"meta": {},
"grants": {},
"incremental_strategy": None,
},
"unique_id": "snapshot.test.my_snapshot",
"original_file_path": normalize("snapshots/snapshot.sql"),
Expand Down Expand Up @@ -127,6 +128,7 @@ def expect_analyses_output(self):
"meta": {},
"unique_key": None,
"grants": {},
"incremental_strategy": None,
},
"unique_id": "analysis.test.a",
"original_file_path": normalize("analyses/a.sql"),
Expand Down Expand Up @@ -164,6 +166,7 @@ def expect_model_output(self):
"alias": None,
"meta": {},
"grants": {},
"incremental_strategy": None,
},
"original_file_path": normalize("models/ephemeral.sql"),
"unique_id": "model.test.ephemeral",
Expand All @@ -190,12 +193,12 @@ def expect_model_output(self):
"full_refresh": None,
"unique_key": None,
"on_schema_change": "ignore",
"incremental_strategy": "delete+insert",
"database": None,
"schema": None,
"alias": None,
"meta": {},
"grants": {},
"incremental_strategy": "delete+insert",
},
"original_file_path": normalize("models/incremental.sql"),
"unique_id": "model.test.incremental",
Expand Down Expand Up @@ -224,6 +227,7 @@ def expect_model_output(self):
"alias": None,
"meta": {},
"grants": {},
"incremental_strategy": None,
},
"original_file_path": normalize("models/sub/inner.sql"),
"unique_id": "model.test.inner",
Expand Down Expand Up @@ -252,6 +256,7 @@ def expect_model_output(self):
"alias": None,
"meta": {},
"grants": {},
"incremental_strategy": None,
},
"original_file_path": normalize("models/outer.sql"),
"unique_id": "model.test.outer",
Expand Down Expand Up @@ -295,6 +300,7 @@ def expect_model_ephemeral_output(self):
"alias": None,
"meta": {},
"grants": {},
"incremental_strategy": None,
},
"unique_id": "model.test.ephemeral",
"original_file_path": normalize("models/ephemeral.sql"),
Expand Down Expand Up @@ -357,6 +363,7 @@ def expect_seed_output(self):
"alias": None,
"meta": {},
"grants": {},
"incremental_strategy": None,
},
"unique_id": "seed.test.seed",
"original_file_path": normalize("seeds/seed.csv"),
Expand Down
52 changes: 52 additions & 0 deletions tests/functional/materializations/test_incremental.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import pytest
from dbt.tests.util import run_dbt, get_manifest
from dbt.exceptions import RuntimeException
from dbt.context.providers import generate_runtime_model_context


my_model_sql = """
select 1 as fun
"""


@pytest.fixture(scope="class")
def models():
return {"my_model.sql": my_model_sql}


def test_basic(project):
results = run_dbt(["run"])
assert len(results) == 1

manifest = get_manifest(project.project_root)
model = manifest.nodes["model.test.my_model"]

# Normally the context will be provided by the macro that calls the
# get_incrmental_strategy_macro method, but for testing purposes
# we create a runtime_model_context.
context = generate_runtime_model_context(
model,
project.adapter.config,
manifest,
)

macro_func = project.adapter.get_incremental_strategy_macro(context, "default")
assert macro_func
assert type(macro_func).__name__ == "MacroGenerator"

macro_func = project.adapter.get_incremental_strategy_macro(context, "append")
assert macro_func
assert type(macro_func).__name__ == "MacroGenerator"

macro_func = project.adapter.get_incremental_strategy_macro(context, "delete+insert")
assert macro_func
assert type(macro_func).__name__ == "MacroGenerator"

# These two incremental strategies are not valid for Postgres
with pytest.raises(RuntimeException) as excinfo:
macro_func = project.adapter.get_incremental_strategy_macro(context, "merge")
assert "merge" in str(excinfo.value)

with pytest.raises(RuntimeException) as excinfo:
macro_func = project.adapter.get_incremental_strategy_macro(context, "insert_overwrite")
assert "insert_overwrite" in str(excinfo.value)