Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/bq incremental strategy insert_overwrite #2153

Merged
merged 2 commits into from
Mar 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- Render `meta` fields as "details" in node views ([dbt-docs#73](https://github.com/fishtown-analytics/dbt-docs/pull/73))
- Default to lower-casing Snowflake columns specified in all-caps ([dbt-docs#74](https://github.com/fishtown-analytics/dbt-docs/pull/74))
- Upgrade site dependencies
- Support `insert_overwrite` materializtion for BigQuery incremental models ([#2153](https://github.com/fishtown-analytics/dbt/pull/2153))


### Under the hood
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@
{%- endmacro %}


{% macro get_insert_overwrite_merge_sql(target, source, dest_columns, predicates) -%}
{{ adapter_macro('get_insert_overwrite_merge_sql', target, source, dest_columns, predicates) }}
{%- endmacro %}


{% macro default__get_merge_sql(target, source, unique_key, dest_columns, predicates) -%}
{%- set predicates = [] if predicates is none else [] + predicates -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
Expand Down Expand Up @@ -77,3 +82,23 @@
{% macro default__get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%}
{{ common_get_delete_insert_merge_sql(target, source, unique_key, dest_columns) }}
{% endmacro %}


{% macro default__get_insert_overwrite_merge_sql(target, source, dest_columns, predicates) -%}
{%- set predicates = [] if predicates is none else [] + predicates -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

merge into {{ target }} as DBT_INTERNAL_DEST
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i could live 1,000 more years and i would still not understand this DML...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have lived 2 days and I have read the documentation, and I now understand this DML.

The key thing I was missing: when not matched by source is always true here because we're using a constant-false predicate. I was errantly thinking that we were still merging on a unique_key, which we are not.

From the docs:

If the merge_condition is FALSE, the query optimizer avoids using a JOIN. This optimization is referred to as a constant false predicate. A constant false predicate is useful when you perform an atomic DELETE on the target plus an INSERT from a source (DELETE with INSERT is also known as a REPLACE operation).

Cool!

using {{ source }} as DBT_INTERNAL_SOURCE
on FALSE

when not matched by source
{% if predicates %} and {{ predicates | join(' and ') }} {% endif %}
then delete

when not matched then insert
({{ dest_cols_csv }})
values
({{ dest_cols_csv }})

{% endmacro %}
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@

{% macro bq_partition_merge(tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns) %}
{% macro dbt_bigquery_validate_get_incremental_strategy(config) %}
{#-- Find and validate the incremental strategy #}
{%- set strategy = config.get("incremental_strategy", default="merge") -%}

{% set invalid_strategy_msg -%}
Invalid incremental strategy provided: {{ strategy }}
Expected one of: 'merge', 'insert_overwrite'
{%- endset %}
{% if strategy not in ['merge', 'insert_overwrite'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{% endif %}

{% do return(strategy) %}
{% endmacro %}


{% macro bq_partition_merge(strategy, tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns) %}
{%- set partition_type =
'date' if partition_by.data_type in ('timestamp, datetime')
else partition_by.data_type -%}
Expand Down Expand Up @@ -31,9 +47,15 @@
array_agg(distinct {{ partition_by.render() }})
from {{ tmp_relation }}
);

-- 3. run the merge statement
{% if strategy == 'merge' %}
{{ get_merge_sql(target_relation, source_sql, unique_key, dest_columns, [predicate]) }};
{% elif strategy == 'insert_overwrite' %}
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }};
{% else %}
{% do exceptions.raise_compiler_error('invalid strategy: ' ~ strategy) %}
{% endif %}

-- 4. clean up the temp table
drop table if exists {{ tmp_relation }}
Expand All @@ -49,6 +71,9 @@
{%- set target_relation = this %}
{%- set existing_relation = load_relation(this) %}
{%- set tmp_relation = make_temp_relation(this) %}

{#-- Validate early so we don't run SQL if the strategy is invalid --#}
{% set strategy = dbt_bigquery_validate_get_incremental_strategy(config) -%}

{%- set raw_partition_by = config.get('partition_by', none) -%}
{%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%}
Expand All @@ -75,6 +100,7 @@
{#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#}
{% if partition_by is not none %}
{% set build_sql = bq_partition_merge(
strategy,
tmp_relation,
target_relation,
sql,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
id,date_day
10,2020-01-01
20,2020-01-01
30,2020-01-02
40,2020-01-02
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@

{{
config(
materialized="incremental",
incremental_strategy='insert_overwrite',
cluster_by="id",
partition_by={
"field": "date_day",
"data_type": "date"
}
)
}}


with data as (
select 1 as id, cast('2020-01-01' as date) as date_day union all
select 2 as id, cast('2020-01-01' as date) as date_day union all
select 3 as id, cast('2020-01-01' as date) as date_day union all
select 4 as id, cast('2020-01-01' as date) as date_day

{% if is_incremental() %}
union all
-- we want to overwrite the 4 records in the 2020-01-01 partition
-- with the 2 records below, but add two more in the 2020-01-01 partition
select 10 as id, cast('2020-01-01' as date) as date_day union all
select 20 as id, cast('2020-01-01' as date) as date_day union all
select 30 as id, cast('2020-01-02' as date) as date_day union all
select 40 as id, cast('2020-01-02' as date) as date_day
{% endif %}
)

select * from data

{% if is_incremental() %}
where ts >= _dbt_max_partition
{% endif %}
8 changes: 6 additions & 2 deletions test/integration/022_bigquery_test/test_scripting.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ def profile_config(self):

def assert_incrementals(self):
results = self.run_dbt()
self.assertEqual(len(results), 2)
self.assertEqual(len(results), 3)

self.run_dbt()
self.assertEqual(len(results), 2)
self.assertEqual(len(results), 3)

results = self.run_dbt(['seed'])

self.assertTablesEqual('incremental_overwrite', 'incremental_overwrite_expected')
6 changes: 3 additions & 3 deletions test/integration/052_column_quoting/test_column_quotes.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def test_snowflake_column_quotes(self):

@use_profile('bigquery')
def test_bigquery_column_quotes(self):
self._run_columnn_quotes()
self._run_columnn_quotes(strategy='insert_overwrite')


class TestColumnQuotingDisabled(BaseColumnQuotingTest):
Expand Down Expand Up @@ -77,7 +77,7 @@ def test_snowflake_column_quotes(self):

@use_profile('bigquery')
def test_bigquery_column_quotes(self):
self._run_columnn_quotes()
self._run_columnn_quotes(strategy='insert_overwrite')

@use_profile('snowflake')
def test_snowflake_column_quotes_merged(self):
Expand Down Expand Up @@ -115,7 +115,7 @@ def test_snowflake_column_quotes(self):

@use_profile('bigquery')
def test_bigquery_column_quotes(self):
self._run_columnn_quotes()
self._run_columnn_quotes(strategy='insert_overwrite')

@use_profile('snowflake')
def test_snowflake_column_quotes_merged(self):
Expand Down