From e878d0e76ec0cf0acf1dd1319e3975c8df43ba28 Mon Sep 17 00:00:00 2001 From: Jeremy Cohen Date: Mon, 24 Feb 2020 17:28:36 -0500 Subject: [PATCH] Add insert_overwrite as incremental strategy on BQ --- .../macros/materializations/common/merge.sql | 25 +++++++++++++ .../macros/materializations/incremental.sql | 30 ++++++++++++++-- .../data/incremental_overwrite_expected.csv | 5 +++ .../incremental_overwrite.sql | 36 +++++++++++++++++++ .../022_bigquery_test/test_scripting.py | 8 +++-- .../052_column_quoting/test_column_quotes.py | 6 ++-- 6 files changed, 103 insertions(+), 7 deletions(-) create mode 100644 test/integration/022_bigquery_test/data/incremental_overwrite_expected.csv create mode 100644 test/integration/022_bigquery_test/scripting-models/incremental_overwrite.sql diff --git a/core/dbt/include/global_project/macros/materializations/common/merge.sql b/core/dbt/include/global_project/macros/materializations/common/merge.sql index b02be5b8b1e..dcbcc1a356d 100644 --- a/core/dbt/include/global_project/macros/materializations/common/merge.sql +++ b/core/dbt/include/global_project/macros/materializations/common/merge.sql @@ -10,6 +10,11 @@ {%- endmacro %} +{% macro get_insert_overwrite_merge_sql(target, source, dest_columns, predicates) -%} + {{ adapter_macro('get_insert_overwrite_merge_sql', target, source, dest_columns, predicates) }} +{%- endmacro %} + + {% macro default__get_merge_sql(target, source, unique_key, dest_columns, predicates) -%} {%- set predicates = [] if predicates is none else [] + predicates -%} {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} @@ -77,3 +82,23 @@ {% macro default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%} {{ common_get_delete_insert_merge_sql(target, source, unique_key, dest_columns) }} {% endmacro %} + + +{% macro default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates) -%} + {%- set predicates = [] if predicates is none else [] + predicates -%} + {%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} + + merge into {{ target }} as DBT_INTERNAL_DEST + using {{ source }} as DBT_INTERNAL_SOURCE + on FALSE + + when not matched by source + {% if predicates %} and {{ predicates | join(' and ') }} {% endif %} + then delete + + when not matched then insert + ({{ dest_cols_csv }}) + values + ({{ dest_cols_csv }}) + +{% endmacro %} diff --git a/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql b/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql index 66e1c48c624..b550c0ca1a2 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql @@ -1,5 +1,21 @@ -{% macro bq_partition_merge(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns) %} +{% macro dbt_bigquery_validate_get_incremental_strategy(config) %} + {#-- Find and validate the incremental strategy #} + {%- set strategy = config.get("incremental_strategy", default="merge") -%} + + {% set invalid_strategy_msg -%} + Invalid incremental strategy provided: {{ strategy }} + Expected one of: 'merge', 'insert_overwrite' + {%- endset %} + {% if strategy not in ['merge', 'insert_overwrite'] %} + {% do exceptions.raise_compiler_error(invalid_strategy_msg) %} + {% endif %} + + {% do return(strategy) %} +{% endmacro %} + + +{% macro bq_partition_merge(strategy, tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns) %} {%- set partition_type = 'date' if partition_by.data_type in ('timestamp, datetime') else partition_by.data_type -%} @@ -31,9 +47,15 @@ array_agg(distinct {{ partition_by.render() }}) from {{ tmp_relation }} ); - + -- 3. run the merge statement + {% if strategy == 'merge' %} {{ get_merge_sql(target_relation, source_sql, unique_key, dest_columns, [predicate]) }}; + {% elif strategy == 'insert_overwrite' %} + {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }}; + {% else %} + {% do exceptions.raise_compiler_error('invalid strategy: ' ~ strategy) %} + {% endif %} -- 4. clean up the temp table drop table if exists {{ tmp_relation }} @@ -49,6 +71,9 @@ {%- set target_relation = this %} {%- set existing_relation = load_relation(this) %} {%- set tmp_relation = make_temp_relation(this) %} + + {#-- Validate early so we don't run SQL if the strategy is invalid --#} + {% set strategy = dbt_bigquery_validate_get_incremental_strategy(config) -%} {%- set raw_partition_by = config.get('partition_by', none) -%} {%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%} @@ -75,6 +100,7 @@ {#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#} {% if partition_by is not none %} {% set build_sql = bq_partition_merge( + strategy, tmp_relation, target_relation, sql, diff --git a/test/integration/022_bigquery_test/data/incremental_overwrite_expected.csv b/test/integration/022_bigquery_test/data/incremental_overwrite_expected.csv new file mode 100644 index 00000000000..7454b880b7a --- /dev/null +++ b/test/integration/022_bigquery_test/data/incremental_overwrite_expected.csv @@ -0,0 +1,5 @@ +id,date_day +10,2020-01-01 +20,2020-01-01 +30,2020-01-02 +40,2020-01-02 diff --git a/test/integration/022_bigquery_test/scripting-models/incremental_overwrite.sql b/test/integration/022_bigquery_test/scripting-models/incremental_overwrite.sql new file mode 100644 index 00000000000..9f0c45a9acb --- /dev/null +++ b/test/integration/022_bigquery_test/scripting-models/incremental_overwrite.sql @@ -0,0 +1,36 @@ + +{{ + config( + materialized="incremental", + incremental_strategy='insert_overwrite', + cluster_by="id", + partition_by={ + "field": "date_day", + "data_type": "date" + } + ) +}} + + +with data as ( + select 1 as id, cast('2020-01-01' as date) as date_day union all + select 2 as id, cast('2020-01-01' as date) as date_day union all + select 3 as id, cast('2020-01-01' as date) as date_day union all + select 4 as id, cast('2020-01-01' as date) as date_day + + {% if is_incremental() %} + union all + -- we want to overwrite the 4 records in the 2020-01-01 partition + -- with the 2 records below, but add two more in the 2020-01-01 partition + select 10 as id, cast('2020-01-01' as date) as date_day union all + select 20 as id, cast('2020-01-01' as date) as date_day union all + select 30 as id, cast('2020-01-02' as date) as date_day union all + select 40 as id, cast('2020-01-02' as date) as date_day + {% endif %} +) + +select * from data + +{% if is_incremental() %} +where ts >= _dbt_max_partition +{% endif %} diff --git a/test/integration/022_bigquery_test/test_scripting.py b/test/integration/022_bigquery_test/test_scripting.py index 7199f634485..eeb05c0b0d0 100644 --- a/test/integration/022_bigquery_test/test_scripting.py +++ b/test/integration/022_bigquery_test/test_scripting.py @@ -16,7 +16,11 @@ def profile_config(self): def assert_incrementals(self): results = self.run_dbt() - self.assertEqual(len(results), 2) + self.assertEqual(len(results), 3) self.run_dbt() - self.assertEqual(len(results), 2) + self.assertEqual(len(results), 3) + + results = self.run_dbt(['seed']) + + self.assertTablesEqual('incremental_overwrite', 'incremental_overwrite_expected') diff --git a/test/integration/052_column_quoting/test_column_quotes.py b/test/integration/052_column_quoting/test_column_quotes.py index a31c1c6e9fb..3546d7a2fb8 100644 --- a/test/integration/052_column_quoting/test_column_quotes.py +++ b/test/integration/052_column_quoting/test_column_quotes.py @@ -47,7 +47,7 @@ def test_snowflake_column_quotes(self): @use_profile('bigquery') def test_bigquery_column_quotes(self): - self._run_columnn_quotes() + self._run_columnn_quotes(strategy='insert_overwrite') class TestColumnQuotingDisabled(BaseColumnQuotingTest): @@ -77,7 +77,7 @@ def test_snowflake_column_quotes(self): @use_profile('bigquery') def test_bigquery_column_quotes(self): - self._run_columnn_quotes() + self._run_columnn_quotes(strategy='insert_overwrite') @use_profile('snowflake') def test_snowflake_column_quotes_merged(self): @@ -115,7 +115,7 @@ def test_snowflake_column_quotes(self): @use_profile('bigquery') def test_bigquery_column_quotes(self): - self._run_columnn_quotes() + self._run_columnn_quotes(strategy='insert_overwrite') @use_profile('snowflake') def test_snowflake_column_quotes_merged(self):