Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
drewbanin committed May 22, 2019
1 parent e9b85af commit 8a00110
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 120 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,3 @@
{#
Create SCD Hash SQL fields cross-db
#}

{% macro archive_hash_arguments(args) %}
{{ adapter_macro('archive_hash_arguments', args) }}
{% endmacro %}

{% macro default__archive_hash_arguments(args) %}
md5({% for arg in args %}coalesce(cast({{ arg }} as varchar ), '') {% if not loop.last %} || '|' || {% endif %}{% endfor %})
{% endmacro %}

{#
Add new columns to the table if applicable
#}
Expand All @@ -25,77 +13,6 @@
{% endfor %}
{% endmacro %}

{% macro archive_get_time() -%}
{{ adapter_macro('archive_get_time') }}
{%- endmacro %}

{% macro default__archive_get_time() -%}
{{ current_timestamp() }}
{%- endmacro %}

{#-- TODO : This doesn't belong here #}
{% macro snowflake__archive_get_time() -%}
to_timestamp_ntz({{ current_timestamp() }})
{%- endmacro %}

{% macro timestamp_strategy(archived_rel, current_rel, config) %}

{% set updated_at = config['updated_at'] %}
{% set row_changed_expr -%}
({{ archived_rel }}.{{ updated_at }} < {{ current_rel }}.{{ updated_at }})
{%- endset %}

-- TODO : Use real macro here....
{% set primary_key = config['unique_key'] %}
{% set scd_id_expr %}
md5({{ primary_key }} || {{ updated_at }}::text)
{% endset %}


{% do return({
"unique_key": primary_key,
"updated_at": updated_at,
"row_changed": row_changed_expr,
"scd_id": scd_id_expr
}) %}
{% endmacro %}

{% macro check_col_strategy(archived_rel, current_rel, config) %}
{% set check_cols = config['check_cols'] %}

{# TODO
{% if check_cols == 'all' %}
{% set check_cols = source_columns | map(attribute='name') | list %}
#}
{% set updated_at = archive_get_time() %}

{% set row_changed_expr -%}
(
{% for col in check_cols %}
{{ archived_rel }}.{{ col }} != {{ current_rel }}.{{ col }}
{%- if not loop.last %} or {% endif %}
{% endfor %}
)
{%- endset %}

{% set primary_key = config['unique_key'] %}
{% set scd_id_expr %}
md5(
concat({% for col in check_cols %}
cast({{ col }} as text),
'-' {% if not loop.last %} , {% endif %}
{% endfor %})
)
{% endset %}

{% do return({
"unique_key": primary_key,
"updated_at": updated_at,
"row_changed": row_changed_expr,
"scd_id": scd_id_expr
}) %}
{% endmacro %}


{% macro archive_update_sql(strategy, source_sql, target_relation, source_columns) -%}

Expand Down Expand Up @@ -142,6 +59,7 @@

{%- endmacro %}


{% macro archive_insert_sql(strategy, source_sql, target_relation, source_columns) -%}

with archive_query as (
Expand Down Expand Up @@ -205,6 +123,7 @@

{% endmacro %}


{% macro get_or_create_relation(database, schema, identifier, type) %}
{%- set target_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) %}

Expand All @@ -222,6 +141,30 @@
{% endmacro %}


{% macro build_staging_relation(target_relation, strategy, sql) %}

{# TODO : Be smarter about database/schema names for temp tables! #}
{%- set tmp_relation = api.Relation.create(
identifier=target_relation.identifier ~ "__dbt_tmp") -%}

{% set insert_sql = archive_insert_sql(strategy, sql, target_relation) %}
{% set update_sql = archive_update_sql(strategy, sql, target_relation) %}

{% call statement('build_staging_relation') %}
{{ create_table_as(True, tmp_relation, insert_sql) }}

insert into {{ tmp_relation }} (dbt_change_type, dbt_scd_id, dbt_valid_to)
select dbt_change_type, dbt_scd_id, dbt_valid_to
from (
{{ update_sql }}
) as sbq
{% endcall %}

{{ return(tmp_relation) }}

{% endmacro %}


{% materialization archive, default %}
{%- set config = model['config'] -%}

Expand All @@ -246,14 +189,7 @@
{% do exceptions.relation_wrong_type(target_relation, 'table') %}
{%- endif -%}

{% if strategy_name == 'timestamp' %}
{% set strategy_macro = timestamp_strategy %}
{% elif strategy_name == 'check' %}
{% set strategy_macro = check_col_strategy %}
{% else %}
{{ exceptions.raise_compiler_error('Got invalid strategy "{}"'.format(strategy_name)) }}
{% endif %}

{% set strategy_macro = strategy_dispatch(strategy_name) %}
{% set strategy = strategy_macro("archived_data", "source_data", config) %}

{% if not target_relation_exists %}
Expand All @@ -267,41 +203,27 @@

{{ adapter.valid_archive_target(target_relation) }}

{# TODO : Be smarter about database/schema names for temp tables! #}
{%- set tmp_relation = api.Relation.create(
identifier=target_table ~ "__dbt_tmp") -%}
{% set tmp_relation = build_staging_relation(target_relation, strategy, model['injected_sql']) %}
{% set source_columns = adapter.get_columns_in_relation(tmp_relation) %}

{% set insert_sql = archive_insert_sql(strategy, model['injected_sql'], target_relation) %}

{% call statement('gen_updates') %}
{{ create_table_as(True, tmp_relation, insert_sql) }}
{% endcall %}

{% call statement('gen_updates') %}
{% set update_sql = archive_update_sql(strategy, model['injected_sql'], target_relation) %}
insert into {{ tmp_relation }} (dbt_change_type, dbt_scd_id, dbt_valid_to)
select
dbt_change_type,
dbt_scd_id,
dbt_valid_to
{# TODO : Make this take a relation #}
{% do adapter.expand_target_column_types(temp_table=target_table ~ "__dbt_tmp",
to_relation=target_relation) %}

from (
{{ update_sql }}
) as sbq
{% endcall %}
{% set missing_columns = adapter.get_missing_columns(tmp_relation, target_relation)
| rejectattr("name", "equalto", "dbt_change_type")
| list %}

{%- set source_columns = adapter.get_columns_in_relation(tmp_relation) -%}
{# TODO : Make this take a relation #}
{{ adapter.expand_target_column_types(temp_table=target_table ~ "__dbt_tmp",
to_relation=target_relation) }}
{% set dest_columns = source_columns
| rejectattr("name", "equalto", "dbt_change_type")
| list %}

{% set missing_columns = adapter.get_missing_columns(tmp_relation, target_relation) %}
{% set missing_columns = missing_columns | rejectattr("name", "equalto", "dbt_change_type") | list %}
{{ create_columns(target_relation, missing_columns) }}
{% do create_columns(target_relation, missing_columns) %}

{% set dest_columns = source_columns | rejectattr("name", "equalto", "dbt_change_type") | list %}
{% set merge_on -%}
DBT_INTERNAL_SOURCE.dbt_scd_id = DBT_INTERNAL_DEST.dbt_scd_id and DBT_INTERNAL_DEST.dbt_valid_to is null
{%- endset %}

{% set merge_on = 'DBT_INTERNAL_SOURCE.dbt_scd_id = DBT_INTERNAL_DEST.dbt_scd_id and DBT_INTERNAL_DEST.dbt_valid_to is null' %}
{% set merge_when = [
{
"type": "matched",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
{#
Dispatch strategies by name, optionally qualified to a package
#}
{% macro strategy_dispatch(name) -%}
{% set original_name = name %}
{% if '.' in name %}
{% set package_name, name = name.split(".", 1) %}
{% else %}
{% set package_name = none %}
{% endif %}

{% if package_name is none %}
{% set package_context = context %}
{% elif package_name in context %}
{% set package_context = context[package_name] %}
{% else %}
{% set error_msg %}
Could not find package '{{package_name}}', called with '{{original_name}}'
{% endset %}
{{ exceptions.raise_compiler_error(error_msg | trim) }}
{% endif %}

{%- set search_name = 'archive_' ~ name ~ '_strategy' -%}

{% if search_name not in package_context %}
{% set error_msg %}
The specified strategy macro '{{name}}' was not found in package '{{ package_name }}'
{% endset %}
{{ exceptions.raise_compiler_error(error_msg | trim) }}
{% endif %}
{{ return(package_context[search_name]) }}
{%- endmacro %}


{#
Create SCD Hash SQL fields cross-db
#}
{% macro archive_hash_arguments(args) %}
{{ adapter_macro('archive_hash_arguments', args) }}
{% endmacro %}


{% macro default__archive_hash_arguments(args) %}
md5({% for arg in args %}
coalesce(cast({{ arg }} as varchar ), '') {% if not loop.last %} || '|' || {% endif %}
{% endfor %})
{% endmacro %}


{#
Get the current time cross-db
#}
{% macro archive_get_time() -%}
{{ adapter_macro('archive_get_time') }}
{%- endmacro %}

{% macro default__archive_get_time() -%}
{{ current_timestamp() }}
{%- endmacro %}

{#-- TODO : This doesn't belong here #}
{% macro snowflake__archive_get_time() -%}
to_timestamp_ntz({{ current_timestamp() }})
{%- endmacro %}


{#
Core strategy definitions
#}
{% macro archive_timestamp_strategy(archived_rel, current_rel, config) %}
{% set primary_key = config['unique_key'] %}
{% set updated_at = config['updated_at'] %}

{% set row_changed_expr -%}
({{ archived_rel }}.{{ updated_at }} < {{ current_rel }}.{{ updated_at }})
{%- endset %}

{% set scd_id_expr = archive_hash_arguments([primary_key, updated_at]) %}

{% do return({
"unique_key": primary_key,
"updated_at": updated_at,
"row_changed": row_changed_expr,
"scd_id": scd_id_expr
}) %}
{% endmacro %}


{% macro archive_check_strategy(archived_rel, current_rel, config) %}
{% set primary_key = config['unique_key'] %}
{% set check_cols = config['check_cols'] %}
{% set updated_at = archive_get_time() %}

{% set row_changed_expr -%}
(
{% for col in check_cols %}
{{ archived_rel }}.{{ col }} != {{ current_rel }}.{{ col }}
{%- if not loop.last %} or {% endif %}
{% endfor %}
)
{%- endset %}

{% set scd_id_expr = archive_hash_arguments(check_cols) %}

{% do return({
"unique_key": primary_key,
"updated_at": updated_at,
"row_changed": row_changed_expr,
"scd_id": scd_id_expr
}) %}
{% endmacro %}

0 comments on commit 8a00110

Please sign in to comment.