Skip to content

Commit

Permalink
fixed materialization for stage to become create if not exists
Browse files Browse the repository at this point in the history
added in external tables capabilities
  • Loading branch information
jonhopper-dataengineers committed Aug 16, 2022
1 parent 3e2f7f9 commit a22ea42
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 5 deletions.
8 changes: 4 additions & 4 deletions macros/source_tables/get_source_build_plan.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@
{% if object_type == 'stream' %}
{{ return(adapter.dispatch('get_stream_build_plan', 'dbt_dataengineers_materilizations')(source_node)) }}
{% elif object_type == 'external' %}
{{ return(adapter.dispatch('get_external_source_build_plan', 'dbt_dataengineers_materilizations')(source_node)) }}
{{ return(adapter.dispatch('get_external_build_plan', 'dbt_dataengineers_materilizations')(source_node)) }}
{% else %}
{{ return(adapter.dispatch('get_source_build_plan', 'dbt_dataengineers_materilizations')(source_node, is_first_run)) }}
{% endif %}
{% endmacro %}

{% macro default__get_source_build_plan(source_node, is_first_run) %}
{{ exceptions.raise_compiler_error("Staging sources is not implemented for the default adapter") }}
{% endmacro %}


{% macro default__get_stream_build_plan(source_node) %}
{{ exceptions.raise_compiler_error("Staging sources is not implemented for the default adapter") }}
{{ exceptions.raise_compiler_error("Staging steams is not implemented for the default adapter") }}
{% endmacro %}


{% macro default__get_external_source_build_plan(source_node) %}
{% macro default__get_external_build_plan(source_node) %}
{{ exceptions.raise_compiler_error("Staging external sources is not implemented for the default adapter") }}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{% macro snowflake__create_external_table(source_node) %}

{%- set columns = source_node.columns.values() -%}
{%- set external = source_node.external -%}
{%- set partitions = external.partitions -%}

{%- set is_csv = dbt_external_tables.is_csv(external.file_format) -%}

{# https://docs.snowflake.net/manuals/sql-reference/sql/create-external-table.html #}
{# This assumes you have already created an external stage #}
CREATE OR REPLACE EXTERNAL TABLE {{source(source_node.source_name, source_node.name)}}
(
file_name VARCHAR(500) AS metadata$filename,
load_date TIMESTAMP_LTZ(7) AS current_timestamp{{- ',' if partitions or columns|length > 0 -}}
{%- if columns or partitions -%}
{%- if partitions -%}{%- for partition in partitions %}
{{partition.name}} {{partition.data_type}} AS {{partition.expression}}{{- ',' if not loop.last or columns|length > 0 -}}
{%- endfor -%}{%- endif -%}
{%- for column in columns %}
{%- set column_quoted = adapter.quote(column.name) if column.quote else column.name %}
{%- set col_expression -%}
{%- set col_id = 'value:c' ~ loop.index if is_csv else 'value:' ~ column_quoted -%}
(case when is_null_value({{col_id}}) or lower({{col_id}}) = 'null' then null else {{col_id}} end)
{%- endset %}
{{column_quoted}} {{column.data_type}} AS ({{col_expression}}::{{column.data_type}})
{{- ',' if not loop.last -}}
{% endfor %}
{%- endif -%}
)
{% if partitions %} PARTITION BY ({{partitions|map(attribute='name')|join(', ')}}) {% endif %}
LOCATION = {{external.location}} {# stage #}
{% if external.auto_refresh in (true, false) -%}
AUTO_REFRESH = {{external.auto_refresh}}
{%- endif %}
{% if external.pattern -%} PATTERN = '{{external.pattern}}' {%- endif %}
{% if external.integration -%} INTEGRATION = '{{external.integration}}' {%- endif %}
FILE_FORMAT = {{external.file_format}}
{% endmacro %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{% macro snowflake__get_external_build_plan(source_node) %}

{% set build_plan = [] %}

{% set old_relation = adapter.get_relation(
database = source_node.database,
schema = source_node.schema,
identifier = source_node.identifier
) %}

{% set create_or_replace = (old_relation is none or var('ext_full_refresh', false)) %}

{% if source_node.external.get('snowpipe', none) is not none %}

{% if create_or_replace %}
{% set build_plan = build_plan + [
dbt_external_tables.snowflake_create_empty_table(source_node),
dbt_external_tables.snowflake_get_copy_sql(source_node, explicit_transaction=true),
dbt_external_tables.snowflake_create_snowpipe(source_node)
] %}
{% else %}
{% set build_plan = build_plan + dbt_external_tables.snowflake_refresh_snowpipe(source_node) %}
{% endif %}

{% else %}

{% if create_or_replace %}
{% set build_plan = build_plan + [dbt_dataengineers_materilizations.snowflake__create_external_table(source_node)] %}
{% else %}
{% set build_plan = build_plan + dbt_external_tables.refresh_external_table(source_node) %}
{% endif %}

{% endif %}

{% do return(build_plan) %}

{% endmacro %}
1 change: 1 addition & 0 deletions macros/source_tables/stage_table_sources.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
{# Initial run to cater for #}
{% do dbt_dataengineers_materilizations.stage_table_sources_plans(sources_to_stage, true, 'internal') %}
{% do dbt_dataengineers_materilizations.stage_table_sources_plans(sources_to_stage, false, 'internal') %}
{% do dbt_dataengineers_materilizations.stage_table_sources_plans(externals_tables_to_stage, false, 'external') %}

{% endif %}
{% endmacro %}
Expand Down
2 changes: 1 addition & 1 deletion macros/stages/snowflake/snowflake__stage.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

-- build model
{%- call statement('main') -%}
{{ dbt_dataengineers_materilizations.snowflake_create_stages_statement(target_relation, sql) }}
{{ dbt_dataengineers_materilizations.snowflake_create_stages_if_not_exist_statement(target_relation, sql) }}
{%- endcall -%}

--------------------------------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
{%- macro snowflake_create_stages_if_not_exist_statement(relation, sql) -%}

{{ log("Creating stages " ~ relation) }}
CREATE STAGE IF NOT EXISTS {{ relation.include(database=(not temporary), schema=(not temporary)) }}
{{ sql }}
;

{%- endmacro -%}
3 changes: 3 additions & 0 deletions packages.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
packages:
- package: dbt-labs/dbt_external_tables
version: 0.8.0

0 comments on commit a22ea42

Please sign in to comment.