Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BQ incremental merge statements respect dest date partitioning #1034

Closed
jtcohen6 opened this issue Sep 28, 2018 · 10 comments · Fixed by #2140
Closed

BQ incremental merge statements respect dest date partitioning #1034

jtcohen6 opened this issue Sep 28, 2018 · 10 comments · Fixed by #2140
Labels

Comments

@jtcohen6
Copy link
Contributor

jtcohen6 commented Sep 28, 2018

Issue (feature?)

Description

When dbt generates merge statements for BigQuery incremental models, we can take advantage of date-partitioning for DBT_INTERNAL_SOURCE with advanced incremental model usage. This code will properly limit the data BQ scans by querying the "source" data with templated code:

with base as (
    
    select * from {{ref('my_model')}}
    {% if adapter.already_exists(this.schema, this.table) and not flags.FULL_REFRESH %}
        where date(partition_column) > '2018-09-26'
    {% endif %}
    
),

But the merge statement does not limit the DBT_INTERNAL_DEST, i.e. the existing table to be incrementally updated. By adding one (optional) line of code to the merge statement on, we can limit the total data scanned (and thereby cost):

merge into `dbt_jcohen`.`my_model_2` as DBT_INTERNAL_DEST
    using (
           select * from ( ... )
        where (true) or (true) is null
    ) as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.id = DBT_INTERNAL_DEST.id
-- my addition
and (DBT_INTERNAL_DEST).partition_column > '2018-09-26'

Where '2018-09-28' is a user-supplied/config value fed into the sql_where or the "advanced incremental" CTE, and here also the merge condition.

Results

In my local example, the BQ incremental merge (status quo) scans 4.32 GB, and adding the additional line decreases the query scan to 1.87 GB, since the full destination table ({{this}}) is itself 3.18 GB.

System information

The output of dbt --version:

installed version: 0.11.1
   latest version: 0.11.1

Up to date!
@jtcohen6
Copy link
Contributor Author

jtcohen6 commented Mar 18, 2019

My approach to this at the time: override the incremental materialization with a custom materialization, which I called incremental_merge. (I ran into some odd issues when I tried to override with a custom materialization named the same.)

Everything in the materialization was exactly the same, except I added dest_where as an expected config arg, which it then fed into a custom (overriden) version of get_merge_sql.

The first and simpler version expected the config argument to look something like where date(my_timestamp_column) > '2019-03-16', and pulled it through the materialization context accordingly:

-- in custom materialization
{%- set dest_where = config.get('dest_where') -%}
  {%- if not dest_where -%}
    {{ exceptions.raise_compiler_error("Must supply a dest_where clause") }}
  {%- endif -%}
-- updated macro
{% macro get_merge_sql(target, source, unique_key, dest_columns, dest_where) -%}
    -- standard stuff
    {%- set dest_cols_csv = dest_columns | map(attribute="name") | join(', ') -%}

    merge into {{ target }} as DBT_INTERNAL_DEST
    using {{ source }} as DBT_INTERNAL_SOURCE

    {% if unique_key %}
        on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
    {% else %}
        on FALSE
    {% endif %}
    -- my only addition
        and {{dest_where}}

-- rest of macro with match conditions
{% endmacro %}

The fancier version filled a default value of dest_where based on the value of partition_by:

{%- set dest_where = config.get('dest_where') -%}
  {%- if not dest_where -%}
    {%- set dest_where = formatting_macro(config.get('partition_by')) -%}
  {%- endif -%}

{% if not dest_where %}
    {{ exceptions.raise_compiler_error("Must supply a dest_where clause") }}
{% endif %}

Where formatting_macro takes a date or timestamp column name, applies logic, and returns something that looks like:

date(my_timestamp_column) between '2019-03-16' and current_date

I think the fancier version makes a fair assumption: In order to take advantage of cost limiting by adding a column filter to the merge statement, we need to be already partitioning by the same column.

This approach still feels much like a manual override. Over the past few months, I have tried to think about approaches that feel cleaner and more appropriate to include in dbt's default BQ behavior. So far, I don't have any great ideas.

@jarlainnix
Copy link

Hi,
is it possible to have an ETA on solving this issue. Currently I have a use case where this particular lack of filtering is making the incremental sql statement to scan 45GB and 86GB on 2 different tables where the only data that can possibly change would be ~1.5GB from last days data.
As you know this adds up some costs fast.
I tried to customise a solution to my problem, but I could only get as far as setting up a new macro called merge.sql (to "replace" the dbt standard one) with the code

{% macro get_merge_sql(target, source, unique_key, dest_columns) -%}
  {{ adapter_macro('get_merge_sql', target, source, unique_key, dest_columns) }}
{%- endmacro %}

{% macro get_delete_insert_merge_sql(target, source, unique_key, dest_columns) -%}
  {{ adapter_macro('get_delete_insert_merge_sql', target, source, unique_key, dest_columns) }}
{%- endmacro %}

{% macro common_get_merge_sql(target, source, unique_key, dest_columns) -%}
    {%- set dest_cols_csv = dest_columns | map(attribute="name") | join(', ') -%}
########### MY ADDITION HERE ###################
    {%- set today = modules.datetime.date.today() -%}
    {%- set one_day = modules.datetime.timedelta(days=2) -%}
    {%- set yesterday = (today - one_day) -%}
    {%- set yesterday_yyyy_mm_dd = yesterday.strftime("%Y-%m-%d") -%}
########### MY ADDITION HERE ###################
    merge into {{ target }} as DBT_INTERNAL_DEST
    using {{ source }} as DBT_INTERNAL_SOURCE

########### MY ADDITION HERE ###################
    {% if unique_key and unique_key == 'event_id' %}
        on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
        and DBT_INTERNAL_DEST.date_partition >= '{{ yesterday_yyyy_mm_dd }}'
    {% elif unique_key %}
        on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
########### MY ADDITION HERE ###################
    {% else %}
        on FALSE
    {% endif %}
(...) standard code

it works for me but I had to hardcode the partition column name!!!
How can I change the custom materialization code to add a new config variable both on local machine and on dbt cloud?????

@drewbanin
Copy link
Contributor

Hey @jarlainnix - the problem you've identified is sort of the core challenge we need to solve!

There are two things to sort out:

  1. what is the field that should be filtered on
  2. what is the filter that dbt should apply on this field?

I can see that you're filtering for only the last day of data. If this approach works for you, then that's great! I don't however think that this is a good general solution to the problem -- some projects may need to look back 2 days, or 7 days, or 30 days!

If you want to make the date_partition column dynamic, you can do that using configs!

Try something like this:

-- models/my_incremental_model.sql

{{
  config(
    materialized='incremental',
    ... other configs here ...
    filter_field="date_partition"
  )
}}

select ...

Then in the common_get_merge_sql macro, you can use:

{% set filter_field = config.get('filter_field') %}

You can then use {{ filter_field }} where you are currently using "date_partition" directly.

I'd love to get a fix out for this - let me spend some time revisiting this over the next week or two :)

@jarlainnix
Copy link

jarlainnix commented Aug 2, 2019

I understand that there can be various use cases to solve and we can only solve one at a time.
I'de say that for the specific use case of:

  • adapter = Bigquery
  • materialized = incremental
  • partition_by = 'PARTITION_COLUMN'
    the answer to Q 1. is that the only column that makes sense to filter is the one set to partition the table by = PARTITION_COLUMN (set on the config as partition_by variable). As far as I am thinking this would be the only column that would enable partition pruning on a bigquery partitioned table.

For Q2 I have a strange suggestion to generalise the solution. Assuming the code lines addition on the common_get_merge_sql macro would be something like

--    macros/common_get_merge_sql.sql
(...)
{% if unique_key %}
        on DBT_INTERNAL_SOURCE.{{ unique_key }} = DBT_INTERNAL_DEST.{{ unique_key }}
##    these lines    ##
        and DBT_INTERNAL_DEST.PARTITION_COLUMN BETWEEN
            {{ DBT_INTERNAL_SOURCE_MIN_VALUE }}
            and {{ DBT_INTERNAL_SOURCE_MAX_VALUE }}
##    these lines    ##
    {% else %}
        on FALSE
    {% endif %}

we could set a mid-process subquery (like we do with call statements on our models) that would check for the limit values for the partitions.
in the case of this example:

--    models/stg_formula_evaluated_partitioned.sql
{{ config(
    materialized='incremental',
    unique_key='event_id',
    partition_by='date_partition'
    )
}}
with deduped_events as (
    SELECT *
FROM (
  SELECT
      *,
      ROW_NUMBER()
          OVER (PARTITION BY attributes.event_id) as row_number
  FROM `{{ source('user_events', 'formula_evaluated') }}_*`
  {% if is_incremental() %}
    where _TABLE_SUFFIX >= '{{ var('date_suffix', default=yesterday()) }}'
  {% endif %}
)
WHERE row_number = 1
),
cte_final as (
  select
    attributes.event_id,
    attributes.event_time,
    attributes.publish_time,
    attributes.user_id,
    attributes.app_id,
    attributes.view_id,
    attributes.request_id,
    attributes.producer,
    attributes.type,
    attributes.session_id,
    formula_evaluation_id,
    cell_id,
    column,
    row,
    execution_trigger,
    EXTRACT(DATE FROM attributes.event_time) AS date_partition
  from deduped_events
)
SELECT
  *
FROM
  cte_final

(that is my specific use case) the mid-process subquery would check

SELECT
min(date_partition) as DBT_INTERNAL_SOURCE_MIN_VALUE
,max(date_partition) as DBT_INTERNAL_SOURCE_MAX_VALUE
from {{ stg_formula_evaluated_partitioned.sql model generated SQL }}

here the {{ filter_field }} has to be the partition_by field setup on the config and the min and max values would be queried in the background as the tradeoff for the workaround can be great.

(sorry for the messy explanation)
Hope I've helped with something

@drewbanin
Copy link
Contributor

Thanks @jarlainnix - this is really great!

@hui-zheng
Copy link

Hi @jarlainnix,

I agree with your first point, the {{ filter_field }} has to be the partition_by field setup on the config.

For the 2nd point about getting min and max values, could I know if you actually implemented that part or is it a thought for discussion? I could see some challenges or inefficiency to implement that with the current dbt design.

In the mid-subprocess, to get the min and max of the DBT_INTERNAL_SOURCE, it requires us to first run (stg_formula_evaluated_partitioned) just as a CTE of the mid-subprocess. then we could compile and run the merge statement as the final step. The whole process requires running (stg_formula_evaluated_partitioned) DBT_INTERNAL_SOURCE two times, which is very inefficient.

I think we could use BigQuery scripts and temporary_tables to solve this.
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-definition-language#temporary_tables

The approach is for dbt to compile a BigQuery script to run the following steps as one BigQuery transaction for an incremental run.
take your example

  1. run (stg_formula_evaluated_partitioned) model, and save it as a temporary_table called DBT_INTERNAL_SOURCE.
  2. get the min and max of the DBT_INTERNAL_SOURCE(temp table), and save it as BigQuery variables.
  3. merge DBT_INTERNAL_SOURCE (temp table) into DBT_INTERNAL_DEST using partition range between min and max.

I don't know how easy to modify get_merge_sql() macro to run a BigQuery script instead of a single query. but I think if we could do that, this might be a good approach to solve the problem.

@jtcohen6
Copy link
Contributor Author

@hui-zheng I agree with the approach you've outlined! Check out the work in #1971, which we're planning to ship in 0.16.0.

@jarlainnix
Copy link

@hui-zheng (I was away and just got back today)
I was going to point you to the #1971 issue as @jtcohen6 did.
I still use my own fix as described before, and am eagerly awaiting for the version where a better solution is implemented (under works on #1971 )
go go @jtcohen6 😄 💪

@andreic-ub
Copy link

hi guys. I have the same specific use case of such partition pruning for the dbt merge against BQ.
was the 'partition filter' parameter implemented ?
Thanks

@jtcohen6
Copy link
Contributor Author

Hi @andreic-ub, we ended up implementing this as a partition-aware incremental strategy called insert_overwrite (docs, discourse post)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants