-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix for re-used check cols in snapshots #1614
Changes from 5 commits
5e6e746
4df0bbd
a2e801c
35d1a7a
329145c
b6e7351
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,16 +35,17 @@ | |
{# | ||
Create SCD Hash SQL fields cross-db | ||
#} | ||
{% macro snapshot_hash_arguments(args) %} | ||
{% macro snapshot_hash_arguments(args) -%} | ||
{{ adapter_macro('snapshot_hash_arguments', args) }} | ||
{% endmacro %} | ||
{%- endmacro %} | ||
|
||
|
||
{% macro default__snapshot_hash_arguments(args) %} | ||
{% macro default__snapshot_hash_arguments(args) -%} | ||
md5({% for arg in args %} | ||
coalesce(cast({{ arg }} as varchar ), '') {% if not loop.last %} || '|' || {% endif %} | ||
coalesce(cast({{ arg }} as varchar ), '') | ||
{% if not loop.last %} || '|' || {% endif %} | ||
{% endfor %}) | ||
{% endmacro %} | ||
{%- endmacro %} | ||
|
||
|
||
{# | ||
|
@@ -62,7 +63,7 @@ | |
{# | ||
Core strategy definitions | ||
#} | ||
{% macro snapshot_timestamp_strategy(node, snapshotted_rel, current_rel, config) %} | ||
{% macro snapshot_timestamp_strategy(node, snapshotted_rel, current_rel, config, target_exists) %} | ||
{% set primary_key = config['unique_key'] %} | ||
{% set updated_at = config['updated_at'] %} | ||
|
||
|
@@ -81,7 +82,7 @@ | |
{% endmacro %} | ||
|
||
|
||
{% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, config) %} | ||
{% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, config, target_exists) %} | ||
{% set check_cols_config = config['check_cols'] %} | ||
{% set primary_key = config['unique_key'] %} | ||
{% set updated_at = snapshot_get_time() %} | ||
|
@@ -106,7 +107,18 @@ | |
) | ||
{%- endset %} | ||
|
||
{% set scd_id_cols = [primary_key] + (check_cols | list) %} | ||
{% if target_exists %} | ||
{% set row_version -%} | ||
( | ||
select count(*) from {{ snapshotted_rel }} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if there are two rows with the same primary key in one snapshot operation? Obviously user error, but do we do anything like error about it, or just give bad results? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that will result in "undefined behavior" -- the exact outcome depends on:
Whichever way you slice it, dbt is going to do something undesirable if the specified primary key is duplicated in the snapshot operation though. On pg/redshift, you'll end up with dupes in your snapshot table. On snowflake/bq, you should see a We could:
|
||
where {{ snapshotted_rel }}.dbt_unique_key = {{ primary_key }} | ||
) | ||
{%- endset %} | ||
{% set scd_id_cols = [primary_key, row_version] + (check_cols | list) %} | ||
{% else %} | ||
{% set scd_id_cols = [primary_key] + (check_cols | list) %} | ||
{% endif %} | ||
|
||
{% set scd_id_expr = snapshot_hash_arguments(scd_id_cols) %} | ||
|
||
{% do return({ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -202,15 +202,31 @@ def raw_execute(self, sql, fetch=False): | |
|
||
def execute(self, sql, auto_begin=False, fetch=None): | ||
# auto_begin is ignored on bigquery, and only included for consistency | ||
_, iterator = self.raw_execute(sql, fetch=fetch) | ||
query_job, iterator = self.raw_execute(sql, fetch=fetch) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice! |
||
|
||
if fetch: | ||
res = self.get_table_from_response(iterator) | ||
else: | ||
res = dbt.clients.agate_helper.empty_table() | ||
|
||
# If we get here, the query succeeded | ||
status = 'OK' | ||
if query_job.statement_type == 'CREATE_VIEW': | ||
status = 'CREATE VIEW' | ||
|
||
elif query_job.statement_type == 'CREATE_TABLE_AS_SELECT': | ||
conn = self.get_thread_connection() | ||
client = conn.handle | ||
table = client.get_table(query_job.destination) | ||
status = 'CREATE TABLE ({})'.format(table.num_rows) | ||
|
||
elif query_job.statement_type in ['INSERT', 'DELETE', 'MERGE']: | ||
status = '{} ({})'.format( | ||
query_job.statement_type, | ||
query_job.num_dml_affected_rows | ||
) | ||
|
||
else: | ||
status = 'OK' | ||
|
||
return status, res | ||
|
||
def create_bigquery_table(self, database, schema, table_name, callback, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,9 @@ | ||
{% macro bigquery__snapshot_hash_arguments(args) %} | ||
to_hex(md5(concat({% for arg in args %}coalesce(cast({{ arg }} as string), ''){% if not loop.last %}, '|',{% endif %}{% endfor %}))) | ||
{% endmacro %} | ||
{% macro bigquery__snapshot_hash_arguments(args) -%} | ||
to_hex(md5(concat({% for arg in args %} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have similar for-loop whitespace formatting questions as I did on snowflake, here |
||
coalesce(cast({{ arg }} as string), ''){% if not loop.last %}, '|',{% endif -%} | ||
{% endfor %} | ||
))) | ||
{%- endmacro %} | ||
|
||
{% macro bigquery__create_columns(relation, columns) %} | ||
{{ adapter.alter_table_add_columns(relation, columns) }} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
|
||
|
||
with query as ( | ||
|
||
-- check that the current value for id=1 is red | ||
select case when ( | ||
select count(*) | ||
from {{ ref('check_cols_cycle') }} | ||
where id = 1 and color = 'red' and dbt_valid_to is null | ||
) = 1 then 0 else 1 end as failures | ||
|
||
union all | ||
|
||
-- check that the previous 'red' value for id=1 is invalidated | ||
select case when ( | ||
select count(*) | ||
from {{ ref('check_cols_cycle') }} | ||
where id = 1 and color = 'red' and dbt_valid_to is not null | ||
) = 1 then 0 else 1 end as failures | ||
|
||
union all | ||
|
||
-- check that there's only one current record for id=2 | ||
select case when ( | ||
select count(*) | ||
from {{ ref('check_cols_cycle') }} | ||
where id = 2 and color = 'pink' and dbt_valid_to is null | ||
) = 1 then 0 else 1 end as failures | ||
|
||
union all | ||
|
||
-- check that the previous value for id=2 is represented | ||
select case when ( | ||
select count(*) | ||
from {{ ref('check_cols_cycle') }} | ||
where id = 2 and color = 'green' and dbt_valid_to is not null | ||
) = 1 then 0 else 1 end as failures | ||
|
||
union all | ||
|
||
-- check that there are 5 records total in the table | ||
select case when ( | ||
select count(*) | ||
from {{ ref('check_cols_cycle') }} | ||
) = 5 then 0 else 1 end as failures | ||
|
||
) | ||
|
||
select * | ||
from query | ||
where failures = 1 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,33 @@ | ||
|
||
{% snapshot check_cols_cycle %} | ||
|
||
{{ | ||
config( | ||
target_database=database, | ||
target_schema=schema, | ||
unique_key='id', | ||
strategy='check', | ||
check_cols=['color'] | ||
) | ||
}} | ||
|
||
{% if var('version') == 1 %} | ||
|
||
select 1 as id, 'red' as color union all | ||
select 2 as id, 'green' as color | ||
|
||
{% elif var('version') == 2 %} | ||
|
||
select 1 as id, 'blue' as color union all | ||
select 2 as id, 'green' as color | ||
|
||
{% elif var('version') == 3 %} | ||
|
||
select 1 as id, 'red' as color union all | ||
select 2 as id, 'pink' as color | ||
|
||
{% else %} | ||
{% do exceptions.raise_compiler_error("Got bad version: " ~ var('version')) %} | ||
{% endif %} | ||
|
||
{% endsnapshot %} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
from test.integration.base import DBTIntegrationTest, use_profile | ||
import dbt.exceptions | ||
|
||
|
||
class TestSimpleSnapshotFiles(DBTIntegrationTest): | ||
NUM_SNAPSHOT_MODELS = 1 | ||
|
||
@property | ||
def schema(self): | ||
return "simple_snapshot_004" | ||
|
||
@property | ||
def models(self): | ||
return "models" | ||
|
||
@property | ||
def project_config(self): | ||
return { | ||
"snapshot-paths": ['check-snapshots'], | ||
"test-paths": ['check-snapshots-expected'], | ||
"source-paths": [], | ||
} | ||
|
||
def test_snapshot_check_cols_cycle(self): | ||
results = self.run_dbt(["snapshot", '--vars', 'version: 1']) | ||
self.assertEqual(len(results), 1) | ||
|
||
results = self.run_dbt(["snapshot", '--vars', 'version: 2']) | ||
self.assertEqual(len(results), 1) | ||
|
||
results = self.run_dbt(["snapshot", '--vars', 'version: 3']) | ||
self.assertEqual(len(results), 1) | ||
|
||
def assert_expected(self): | ||
self.run_dbt(['test', '--data', '--vars', 'version: 3']) | ||
|
||
@use_profile('snowflake') | ||
def test__snowflake__simple_snapshot(self): | ||
self.test_snapshot_check_cols_cycle() | ||
self.assert_expected() | ||
|
||
@use_profile('postgres') | ||
def test__postgres__simple_snapshot(self): | ||
self.test_snapshot_check_cols_cycle() | ||
self.assert_expected() | ||
|
||
@use_profile('bigquery') | ||
def test__bigquery__simple_snapshot(self): | ||
self.test_snapshot_check_cols_cycle() | ||
self.assert_expected() | ||
|
||
@use_profile('redshift') | ||
def test__redshift__simple_snapshot(self): | ||
self.test_snapshot_check_cols_cycle() | ||
self.assert_expected() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to make this
{%- for arg in args -%}...{%- endfor %}
, or does that shove all of this onto one line?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah - i was trying to make the whitespace a little cleaner here - it was pretty hard to find what I was looking for in the logs while it was all on one line (especially with the new subquery).
This output looks something like:
Which isn't the most beautiful SQL I've ever seen, but should be ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding the whitespace controls makes it look like this:
I will update!