Skip to content

Commit

Permalink
archive cleanup, fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
drewbanin committed May 22, 2019
1 parent 8a00110 commit e6f7284
Show file tree
Hide file tree
Showing 16 changed files with 84 additions and 41 deletions.
12 changes: 12 additions & 0 deletions core/dbt/include/global_project/macros/adapters/common.sql
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@
{%- endif -%}
{%- endmacro %}

{% macro get_columns_in_query(select_sql) %}
{% call statement('get_columns_in_query', fetch_result=True, auto_begin=False) -%}
select * from (
{{ select_sql }}
) as __dbt_sbq
where false
limit 0
{% endcall %}

{{ return(load_result('get_columns_in_query').table.columns | map(attribute='name') | list) }}
{% endmacro %}

{% macro create_schema(database_name, schema_name) -%}
{{ adapter_macro('create_schema', database_name, schema_name) }}
{% endmacro %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@

archived_data as (

select * from {{ target_relation }}
select *,
{{ strategy.unique_key }} as dbt_pk

from {{ target_relation }}

),

Expand Down Expand Up @@ -82,7 +85,10 @@

archived_data as (

select * from {{ target_relation }}
select *,
{{ strategy.unique_key }} as dbt_pk

from {{ target_relation }}

),

Expand Down Expand Up @@ -112,7 +118,6 @@
{% macro build_archive_table(strategy, sql) %}

select *,
{{ strategy.unique_key }} as dbt_pk,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.scd_id }} as dbt_scd_id,
{{ strategy.updated_at }} as dbt_valid_from,
Expand Down Expand Up @@ -190,7 +195,7 @@
{%- endif -%}

{% set strategy_macro = strategy_dispatch(strategy_name) %}
{% set strategy = strategy_macro("archived_data", "source_data", config) %}
{% set strategy = strategy_macro(model, "archived_data", "source_data", config) %}

{% if not target_relation_exists %}

Expand All @@ -210,12 +215,15 @@
{% do adapter.expand_target_column_types(temp_table=target_table ~ "__dbt_tmp",
to_relation=target_relation) %}

{% set excluded_cols = ['dbt_change_type', 'dbt_pk'] %}
{% set missing_columns = adapter.get_missing_columns(tmp_relation, target_relation)
| rejectattr("name", "equalto", "dbt_change_type")
| rejectattr("name", "in", excluded_cols)
| rejectattr("name", "in", excluded_cols | upper)
| list %}

{% set dest_columns = source_columns
| rejectattr("name", "equalto", "dbt_change_type")
| rejectattr("name", "in", excluded_cols)
| rejectattr("name", "in", excluded_cols | upper)
| list %}

{% do create_columns(target_relation, missing_columns) %}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,11 @@
{{ current_timestamp() }}
{%- endmacro %}

{#-- TODO : This doesn't belong here #}
{% macro snowflake__archive_get_time() -%}
to_timestamp_ntz({{ current_timestamp() }})
{%- endmacro %}


{#
Core strategy definitions
#}
{% macro archive_timestamp_strategy(archived_rel, current_rel, config) %}
{% macro archive_timestamp_strategy(node, archived_rel, current_rel, config) %}
{% set primary_key = config['unique_key'] %}
{% set updated_at = config['updated_at'] %}

Expand All @@ -86,11 +81,19 @@
{% endmacro %}


{% macro archive_check_strategy(archived_rel, current_rel, config) %}
{% macro archive_check_strategy(node, archived_rel, current_rel, config) %}
{% set check_cols_config = config['check_cols'] %}
{% set primary_key = config['unique_key'] %}
{% set check_cols = config['check_cols'] %}
{% set updated_at = archive_get_time() %}

{% if check_cols_config == 'all' %}
{% set check_cols = get_columns_in_query(node['injected_sql']) %}
{% elif check_cols_config is iterable and (check_cols_config | length) > 0 %}
{% set check_cols = check_cols_config %}
{% else %}
{% do exceptions.raise_compiler_error("Invalid value for 'check_cols': " ~ check_cols_config) %}
{% endif %}

{% set row_changed_expr -%}
(
{% for col in check_cols %}
Expand Down
25 changes: 17 additions & 8 deletions core/dbt/parser/archives.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,19 @@
import os


def set_archive_attributes(node):
config_keys = {
'target_database': 'database',
'target_schema': 'schema'
}

for config_key, node_key in config_keys.items():
if config_key in node.config:
setattr(node, node_key, node.config[config_key])

return node


class ArchiveParser(MacrosKnownParser):
@classmethod
def parse_archives_from_project(cls, config):
Expand Down Expand Up @@ -93,11 +106,8 @@ def load_and_parse(self):
self.all_projects.get(archive.package_name),
archive_config=archive_config)

# TODO : Test this
parsed_node.database = parsed_node.config['target_database']
parsed_node.schema = parsed_node.config['target_schema']

to_return[node_path] = parsed_node
# TODO : Add tests for this
to_return[node_path] = set_archive_attributes(parsed_node)

return to_return

Expand Down Expand Up @@ -144,9 +154,8 @@ def get_fqn(cls, node, package_project_config, extra=[]):
def validate_archives(node):
if node.resource_type == NodeType.Archive:
try:
node.database = node.config['target_database']
node.schema = node.config['target_schema']
return ParsedArchiveNode(**node.to_shallow_dict())
parsed_node = ParsedArchiveNode(**node.to_shallow_dict())
return set_archive_attributes(parsed_node)

except dbt.exceptions.JSONValidationException as exc:
raise dbt.exceptions.CompilationException(str(exc), node)
Expand Down
4 changes: 4 additions & 0 deletions plugins/postgres/dbt/include/postgres/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,7 @@
{% macro postgres__current_timestamp() -%}
now()
{%- endmacro %}

{% macro postgres__archive_get_time() -%}
{{ current_timestamp() }}::timestamp without time zone
{%- endmacro %}
7 changes: 4 additions & 3 deletions plugins/redshift/dbt/include/redshift/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,11 @@
{% macro redshift__check_schema_exists(information_schema, schema) -%}
{{ return(postgres__check_schema_exists(information_schema, schema)) }}
{%- endmacro %}
list_schemas

%}

{% macro redshift__current_timestamp() -%}
getdate()
{%- endmacro %}

{% macro redshift__archive_get_time() -%}
{{ current_timestamp() }}::timestamp
{%- endmacro %}
4 changes: 4 additions & 0 deletions plugins/snowflake/dbt/include/snowflake/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@
convert_timezone('UTC', current_timestamp())
{%- endmacro %}

{% macro snowflake__archive_get_time() -%}
to_timestamp_ntz({{ current_timestamp() }})
{%- endmacro %}


{% macro snowflake__rename_relation(from_relation, to_relation) -%}
{% call statement('rename_relation') -%}
Expand Down
8 changes: 4 additions & 4 deletions test/integration/004_simple_archive_test/seed.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ create table {database}.{schema}.archive_expected (
updated_at TIMESTAMP WITHOUT TIME ZONE,
dbt_valid_from TIMESTAMP WITHOUT TIME ZONE,
dbt_valid_to TIMESTAMP WITHOUT TIME ZONE,
dbt_scd_id VARCHAR(256),
dbt_scd_id VARCHAR(32),
dbt_updated_at TIMESTAMP WITHOUT TIME ZONE
);

Expand Down Expand Up @@ -93,7 +93,7 @@ create table {database}.{schema}.archive_castillo_expected (
updated_at TIMESTAMP WITHOUT TIME ZONE,
dbt_valid_from TIMESTAMP WITHOUT TIME ZONE,
dbt_valid_to TIMESTAMP WITHOUT TIME ZONE,
dbt_scd_id VARCHAR(256),
dbt_scd_id VARCHAR(32),
dbt_updated_at TIMESTAMP WITHOUT TIME ZONE
);

Expand Down Expand Up @@ -139,7 +139,7 @@ create table {database}.{schema}.archive_alvarez_expected (
updated_at TIMESTAMP WITHOUT TIME ZONE,
dbt_valid_from TIMESTAMP WITHOUT TIME ZONE,
dbt_valid_to TIMESTAMP WITHOUT TIME ZONE,
dbt_scd_id VARCHAR(256),
dbt_scd_id VARCHAR(32),
dbt_updated_at TIMESTAMP WITHOUT TIME ZONE
);

Expand Down Expand Up @@ -185,7 +185,7 @@ create table {database}.{schema}.archive_kelly_expected (
updated_at TIMESTAMP WITHOUT TIME ZONE,
dbt_valid_from TIMESTAMP WITHOUT TIME ZONE,
dbt_valid_to TIMESTAMP WITHOUT TIME ZONE,
dbt_scd_id VARCHAR(256),
dbt_scd_id VARCHAR(32),
dbt_updated_at TIMESTAMP WITHOUT TIME ZONE
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
updated_at='updated_at',
)
}}
select * from `{{database}}`.`{{schema}}`.seed
select * from `{{target.database}}`.`{{schema}}`.seed

{% endarchive %}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
updated_at='updated_at',
)
}}
select * from {{database}}.{{schema}}.seed
select * from {{target.database}}.{{schema}}.seed

{% endarchive %}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@
updated_at='updated_at',
)
}}
select * from {{database}}.{{schema}}.super_long
select * from {{target.database}}.{{schema}}.super_long
{% endarchive %}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@
updated_at='updated_at',
)
}}
select * from {{database}}.{{schema}}.seed
select * from {{target.database}}.{{schema}}.seed

{% endarchive %}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
updated_at='updated_at',
)
}}
select * from {{database}}.{{schema}}.seed where last_name = 'Castillo'
select * from {{target.database}}.{{schema}}.seed where last_name = 'Castillo'

{% endarchive %}

Expand All @@ -24,7 +24,7 @@
updated_at='updated_at',
)
}}
select * from {{database}}.{{schema}}.seed where last_name = 'Alvarez'
select * from {{target.database}}.{{schema}}.seed where last_name = 'Alvarez'

{% endarchive %}

Expand All @@ -40,6 +40,6 @@
updated_at='updated_at',
)
}}
select * from {{database}}.{{schema}}.seed where last_name = 'Kelly'
select * from {{target.database}}.{{schema}}.seed where last_name = 'Kelly'

{% endarchive %}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
check_cols=('email',),
)
}}
select * from `{{database}}`.`{{schema}}`.seed
select * from `{{target.database}}`.`{{schema}}`.seed
{% endarchive %}


Expand All @@ -23,5 +23,5 @@
check_cols='all',
)
}}
select * from `{{database}}`.`{{schema}}`.seed
select * from `{{target.database}}`.`{{schema}}`.seed
{% endarchive %}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
check_cols=['email'],
)
}}
select * from {{database}}.{{schema}}.seed
select * from {{target.database}}.{{schema}}.seed

{% endarchive %}

Expand All @@ -24,5 +24,5 @@
check_cols='all',
)
}}
select * from {{database}}.{{schema}}.seed
select * from {{target.database}}.{{schema}}.seed
{% endarchive %}
4 changes: 3 additions & 1 deletion test/integration/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -939,7 +939,9 @@ def _assertTableColumnsEqual(self, relation_a, relation_b):

text_types = {'text', 'character varying', 'character', 'varchar'}

self.assertEqual(len(table_a_result), len(table_b_result))
self.assertEqual(len(table_a_result), len(table_b_result),
"{} vs. {}".format(table_a_result, table_b_result))

for a_column, b_column in zip(table_a_result, table_b_result):
a_name, a_type, a_size = a_column
b_name, b_type, b_size = b_column
Expand Down

0 comments on commit e6f7284

Please sign in to comment.