diff --git a/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql b/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql index b550c0ca1a2..18a0c0bc350 100644 --- a/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql +++ b/plugins/bigquery/dbt/include/bigquery/macros/materializations/incremental.sql @@ -15,51 +15,65 @@ {% endmacro %} -{% macro bq_partition_merge(strategy, tmp_relation, target_relation, sql, unique_key, partition_by, dest_columns) %} +{% macro bq_insert_overwrite(tmp_relation, target_relation, sql, unique_key, partition_by, partitions, dest_columns) %} {%- set partition_type = 'date' if partition_by.data_type in ('timestamp, datetime') else partition_by.data_type -%} - {% set predicate -%} - {{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_upsert) - {%- endset %} - - {%- set source_sql -%} - ( - select * from {{ tmp_relation }} - ) - {%- endset -%} - - -- generated script to merge partitions into {{ target_relation }} - declare dbt_partitions_for_upsert array<{{ partition_type }}>; - declare _dbt_max_partition {{ partition_by.data_type }}; - - set _dbt_max_partition = ( - select max({{ partition_by.field }}) from {{ this }} - ); - - -- 1. create a temp table - {{ create_table_as(True, tmp_relation, sql) }} - - -- 2. define partitions to update - set (dbt_partitions_for_upsert) = ( - select as struct - array_agg(distinct {{ partition_by.render() }}) - from {{ tmp_relation }} - ); + {% if partitions is not none and partitions != [] %} {# static #} + + {% set predicate -%} + {{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in ( + {{ partitions | join (', ') }} + ) + {%- endset %} + + {%- set source_sql -%} + ( + {{sql}} + ) + {%- endset -%} + + {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }} + + {% else %} {# dynamic #} + + {% set predicate -%} + {{ partition_by.render(alias='DBT_INTERNAL_DEST') }} in unnest(dbt_partitions_for_replacement) + {%- endset %} + + {%- set source_sql -%} + ( + select * from {{ tmp_relation }} + ) + {%- endset -%} + + -- generated script to merge partitions into {{ target_relation }} + declare dbt_partitions_for_replacement array<{{ partition_type }}>; + declare _dbt_max_partition {{ partition_by.data_type }}; + + set _dbt_max_partition = ( + select max({{ partition_by.field }}) from {{ this }} + ); + + -- 1. create a temp table + {{ create_table_as(True, tmp_relation, sql) }} + + -- 2. define partitions to update + set (dbt_partitions_for_replacement) = ( + select as struct + array_agg(distinct {{ partition_by.render() }}) + from {{ tmp_relation }} + ); + + -- 3. run the merge statement + {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }}; + + -- 4. clean up the temp table + drop table if exists {{ tmp_relation }} - -- 3. run the merge statement - {% if strategy == 'merge' %} - {{ get_merge_sql(target_relation, source_sql, unique_key, dest_columns, [predicate]) }}; - {% elif strategy == 'insert_overwrite' %} - {{ get_insert_overwrite_merge_sql(target_relation, source_sql, dest_columns, [predicate]) }}; - {% else %} - {% do exceptions.raise_compiler_error('invalid strategy: ' ~ strategy) %} {% endif %} - -- 4. clean up the temp table - drop table if exists {{ tmp_relation }} - {% endmacro %} @@ -77,6 +91,7 @@ {%- set raw_partition_by = config.get('partition_by', none) -%} {%- set partition_by = adapter.parse_partition_by(raw_partition_by) -%} + {%- set partitions = config.get('partitions', none) -%} {%- set cluster_by = config.get('cluster_by', none) -%} {{ run_hooks(pre_hooks) }} @@ -98,14 +113,22 @@ {% set dest_columns = adapter.get_columns_in_relation(existing_relation) %} {#-- if partitioned, use BQ scripting to get the range of partition values to be updated --#} - {% if partition_by is not none %} - {% set build_sql = bq_partition_merge( - strategy, + {% if strategy == 'insert_overwrite' %} + + {% set missing_partition_msg -%} + The 'insert_overwrite' strategy requires the `partition_by` config. + {%- endset %} + {% if partition_by is none %} + {% do exceptions.raise_compiler_error(missing_partition_msg) %} + {% endif %} + + {% set build_sql = bq_insert_overwrite( tmp_relation, target_relation, sql, unique_key, partition_by, + partitions, dest_columns) %} {% else %} diff --git a/test/integration/052_column_quoting/test_column_quotes.py b/test/integration/052_column_quoting/test_column_quotes.py index 3546d7a2fb8..b298df4bfa8 100644 --- a/test/integration/052_column_quoting/test_column_quotes.py +++ b/test/integration/052_column_quoting/test_column_quotes.py @@ -47,7 +47,7 @@ def test_snowflake_column_quotes(self): @use_profile('bigquery') def test_bigquery_column_quotes(self): - self._run_columnn_quotes(strategy='insert_overwrite') + self._run_columnn_quotes(strategy='merge') class TestColumnQuotingDisabled(BaseColumnQuotingTest): @@ -75,10 +75,6 @@ def test_redshift_column_quotes(self): def test_snowflake_column_quotes(self): self._run_columnn_quotes() - @use_profile('bigquery') - def test_bigquery_column_quotes(self): - self._run_columnn_quotes(strategy='insert_overwrite') - @use_profile('snowflake') def test_snowflake_column_quotes_merged(self): self._run_columnn_quotes(strategy='merge') @@ -113,10 +109,6 @@ def test_redshift_column_quotes(self): def test_snowflake_column_quotes(self): self._run_columnn_quotes() - @use_profile('bigquery') - def test_bigquery_column_quotes(self): - self._run_columnn_quotes(strategy='insert_overwrite') - @use_profile('snowflake') def test_snowflake_column_quotes_merged(self): self._run_columnn_quotes(strategy='merge')