Skip to content

Commit

Permalink
Merge pull request #2198 from fishtown-analytics/rework/incremental-o…
Browse files Browse the repository at this point in the history
…verwrite

Rework insert_overwrite incremental strategy
  • Loading branch information
beckjake authored Mar 18, 2020
2 parents 0844be5 + 4fee33c commit f4c6272
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,51 +15,65 @@
{% endmacro %}


{% macro bq_partition_merge(strategy, tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns) %}
{% macro bq_insert_overwrite(tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns) %}
{%- set partition_type =
'date' if partition_by.data_type in ('timestamp, datetime')
else partition_by.data_type -%}

{% set predicate -%}
{{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_upsert)
{%- endset %}

{%- set source_sql -%}
(
select * from {{ tmp_relation }}
)
{%- endset -%}

-- generated script to merge partitions into {{ target_relation }}
declare dbt_partitions_for_upsert array<{{ partition_type }}>;
declare _dbt_max_partition {{ partition_by.data_type }};

set _dbt_max_partition = (
select max({{ partition_by.field }}) from {{ this }}
);

-- 1. create a temp table
{{ create_table_as(True, tmp_relation, sql) }}

-- 2. define partitions to update
set (dbt_partitions_for_upsert) = (
select as struct
array_agg(distinct {{ partition_by.render() }})
from {{ tmp_relation }}
);
{% if partitions is not none and partitions != [] %} {# static #}

{% set predicate -%}
{{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in (
{{ partitions | join (', ') }}
)
{%- endset %}

{%- set source_sql -%}
(
{{sql}}
)
{%- endset -%}

{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }}

{% else %} {# dynamic #}

{% set predicate -%}
{{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement)
{%- endset %}

{%- set source_sql -%}
(
select * from {{ tmp_relation }}
)
{%- endset -%}

-- generated script to merge partitions into {{ target_relation }}
declare dbt_partitions_for_replacement array<{{ partition_type }}>;
declare _dbt_max_partition {{ partition_by.data_type }};

set _dbt_max_partition = (
select max({{ partition_by.field }}) from {{ this }}
);

-- 1. create a temp table
{{ create_table_as(True, tmp_relation, sql) }}

-- 2. define partitions to update
set (dbt_partitions_for_replacement) = (
select as struct
array_agg(distinct {{ partition_by.render() }})
from {{ tmp_relation }}
);

-- 3. run the merge statement
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }};

-- 4. clean up the temp table
drop table if exists {{ tmp_relation }}

-- 3. run the merge statement
{% if strategy == 'merge' %}
{{ get_merge_sql(target_relation, source_sql, unique_key, dest_columns, [predicate]) }};
{% elif strategy == 'insert_overwrite' %}
{{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }};
{% else %}
{% do exceptions.raise_compiler_error('invalid strategy: ' ~ strategy) %}
{% endif %}

-- 4. clean up the temp table
drop table if exists {{ tmp_relation }}

{% endmacro %}


Expand All @@ -77,6 +91,7 @@

{%- set raw_partition_by = config.get('partition_by', none) -%}
{%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%}
{%- set partitions = config.get('partitions', none) -%}
{%- set cluster_by = config.get('cluster_by', none) -%}

{{ run_hooks(pre_hooks) }}
Expand All @@ -98,14 +113,22 @@
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}

{#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#}
{% if partition_by is not none %}
{% set build_sql = bq_partition_merge(
strategy,
{% if strategy == 'insert_overwrite' %}

{% set missing_partition_msg -%}
The 'insert_overwrite' strategy requires the `partition_by` config.
{%- endset %}
{% if partition_by is none %}
{% do exceptions.raise_compiler_error(missing_partition_msg) %}
{% endif %}

{% set build_sql = bq_insert_overwrite(
tmp_relation,
target_relation,
sql,
unique_key,
partition_by,
partitions,
dest_columns) %}

{% else %}
Expand Down
10 changes: 1 addition & 9 deletions test/integration/052_column_quoting/test_column_quotes.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def test_snowflake_column_quotes(self):

@use_profile('bigquery')
def test_bigquery_column_quotes(self):
self._run_columnn_quotes(strategy='insert_overwrite')
self._run_columnn_quotes(strategy='merge')


class TestColumnQuotingDisabled(BaseColumnQuotingTest):
Expand Down Expand Up @@ -75,10 +75,6 @@ def test_redshift_column_quotes(self):
def test_snowflake_column_quotes(self):
self._run_columnn_quotes()

@use_profile('bigquery')
def test_bigquery_column_quotes(self):
self._run_columnn_quotes(strategy='insert_overwrite')

@use_profile('snowflake')
def test_snowflake_column_quotes_merged(self):
self._run_columnn_quotes(strategy='merge')
Expand Down Expand Up @@ -113,10 +109,6 @@ def test_redshift_column_quotes(self):
def test_snowflake_column_quotes(self):
self._run_columnn_quotes()

@use_profile('bigquery')
def test_bigquery_column_quotes(self):
self._run_columnn_quotes(strategy='insert_overwrite')

@use_profile('snowflake')
def test_snowflake_column_quotes_merged(self):
self._run_columnn_quotes(strategy='merge')
Expand Down

0 comments on commit f4c6272

Please sign in to comment.