Skip to content

Commit

Permalink
added in build plans for stages
Browse files Browse the repository at this point in the history
  • Loading branch information
jonhopper-dataengineers committed Aug 16, 2022
1 parent 660e39b commit 6c92af9
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 0 deletions.
7 changes: 7 additions & 0 deletions macros/stages/get_stage_build_plan.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{% macro get_stage_build_plan(source_node) %}
{{ return(adapter.dispatch('get_stage_build_plan', 'dbt_dataengineers_materilizations')(source_node)) }}
{% endmacro %}

{% macro default__get_stage_build_plan(source_node) %}
{{ exceptions.raise_compiler_error("Staging stages is not implemented for the default adapter") }}
{% endmacro %}
19 changes: 19 additions & 0 deletions macros/stages/snowflake/snowflake__get_stage_build_plan.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{% macro snowflake__get_stage_build_plan(source_node) %}

{% set build_plan = [] %}

{% if source_node.config.materialized == 'stage' %}
{% set stage_relation = api.Relation.create(
database = source_node.database,
schema = source_node.schema,
identifier = source_node.name
) %}

{% set sql = render(source_node.get('raw_sql')) %}
{% set build_plan = build_plan + [dbt_dataengineers_materilizations.snowflake_create_stages_statement(stage_relation, sql)] %}


{% endif %}
{% do return(build_plan) %}

{% endmacro %}
File renamed without changes.
27 changes: 27 additions & 0 deletions macros/stages/source_tables.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
version: 2
macros:

- name: stage_stages
description: Creates a build execution plan to action
docs:
show: false

- name: stage_stages_plans
docs:
show: false

- name: get_stage_build_plan
docs:
show: false

- name: default__get_stage_build_plan
docs:
show: false

- name: snowflake__get_stage_build_plan
docs:
show: false

- name: snowflake_create_stages_statement
docs:
show: false
41 changes: 41 additions & 0 deletions macros/stages/stage_stages.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
{% macro stage_stages() %}
{% if flags.WHICH == 'run' or flags.WHICH == 'run-operation' %}
{% set stages_to_stage = [] %}

{% set nodes = graph.nodes.values() if graph.nodes else [] %}
{% for node in nodes %}
{% if node.config.materialized == 'stage' %}
{% do stages_to_stage.append(node) %}
{% endif %}
{% endfor %}

{% do log('stages to create: ' ~ stages_to_stage|length, info = true) %}

{# Initial run to cater for #}
{% do dbt_dataengineers_materilizations.stage_stages_plans(stages_to_stage) %}


{% endif %}
{% endmacro %}


{% macro stage_stages_plans(items_to_stage) %}
{% for node in items_to_stage %}
{% set loop_label = loop.index ~ ' of ' ~ loop.length %}
{% do log(loop_label ~ ' START stage creation ' ~ node.schema ~ '.' ~ node.identifier, info = true) -%}

{% set run_queue = dbt_dataengineers_materilizations.get_stage_build_plan(node) %}
{% do log(loop_label ~ ' SKIP stage ' ~ node.schema ~ '.' ~ node.identifier, info = true) if run_queue == [] %}

{% set width = flags.PRINTER_WIDTH %}
{% for cmd in run_queue %}
{# do log(loop_label ~ ' ' ~ cmd, info = true) #}
{% call statement('runner', fetch_result = True, auto_begin = False) %}
{{ cmd }}
{% endcall %}
{% set runner = load_result('runner') %}
{% set log_msg = runner['response'] if 'response' in runner.keys() else runner['status'] %}
{% do log(loop_label ~ ' ' ~ log_msg ~ ' stage model ' ~ node.schema ~ '.' ~ node.identifier, info = true) %}
{% endfor %}
{% endfor %}
{% endmacro %}

0 comments on commit 6c92af9

Please sign in to comment.