From 7a8ae8bc702ec1fa09532aa2820b933e392eb11f Mon Sep 17 00:00:00 2001 From: Gemma Down Date: Wed, 13 Sep 2023 12:04:56 +0100 Subject: [PATCH 01/15] Add named columns --- macros/get_column_name_lists.sql | 241 ++++++++++++++++++++++++++ macros/insert_into_metadata_table.sql | 34 ++-- macros/upload_results.sql | 22 +++ 3 files changed, 283 insertions(+), 14 deletions(-) create mode 100644 macros/get_column_name_lists.sql diff --git a/macros/get_column_name_lists.sql b/macros/get_column_name_lists.sql new file mode 100644 index 00000000..050974cb --- /dev/null +++ b/macros/get_column_name_lists.sql @@ -0,0 +1,241 @@ + +{# + These are the column lists used as part of the upload macros - the order here should be the same + as the order in each individual `upload_dataset` macro. +#} + +{% macro get_column_name_list(dataset) -%} + + {# Exposures #} + {% if dataset == 'exposures' %} + + ( + command_invocation_id, + node_id, + run_started_at, + name, + type, + owner, + maturity, + path, + description, + url, + package_name, + depends_on_nodes, + tags, + all_results + ) + + {# Invocations #} + {% elif dataset == 'invocations' %} + + ( + command_invocation_id, + dbt_version, + project_name, + run_started_at, + dbt_command, + full_refresh_flag, + target_profile_name, + target_name, + target_schema, + target_threads, + dbt_cloud_project_id, + dbt_cloud_job_id, + dbt_cloud_run_id, + dbt_cloud_run_reason_category, + dbt_cloud_run_reason, + env_vars, + dbt_vars, + invocation_args, + dbt_custom_envs + ) + + {# Model Executions #} + {% elif dataset == 'model_executions' %} + + ( + node_id, + command_invocation_id, + run_started_at, + was_full_refresh, + thread_id, + status, + compile_started_at, + query_completed_at, + total_node_runtime, + rows_affected, + {% if target.type == 'bigquery' %} + bytes_processed, + {% endif %} + materialization, + schema, + name, + alias, + message, + adapter_response + ) + + {# Models #} + {% elif dataset == 'models' %} + + ( + node_id, + command_invocation_id, + run_started_at, + database, + schema, + name, + depends_on_nodes, + package_name, + path, + checksum, + materialization, + tags, + meta, + alias, + all_results + ) + + + {# Seed Executions #} + {% elif dataset == 'seed_executions' %} + + ( + command_invocation_id, + node_id, + run_started_at, + was_full_refresh, + thread_id, + status, + compile_started_at, + query_completed_at, + total_node_runtime, + rows_affected, + materialization, + schema, + name, + alias, + message, + adapter_response + ) + + {# Seeds #} + {% elif dataset == 'seeds' %} + + ( + command_invocation_id, + node_id, + run_started_at, + database, + schema, + name, + package_name, + path, + checksum, + meta, + alias, + all_results + ) + + {# Snapshot Executions #} + {% elif dataset == 'snapshot_executions' %} + + ( + command_invocation_id, + node_id, + run_started_at, + was_full_refresh, + thread_id, + status, + compile_started_at, + query_completed_at, + total_node_runtime, + rows_affected, + materialization, + schema, + name, + alias, + message, + adapter_response + ) + + {# Snapshots #} + {% elif dataset == 'snapshots' %} + + ( + command_invocation_id, + node_id, + run_started_at, + database, + schema, + name, + depends_on_nodes, + package_name, + path, + checksum, + strategy, + meta, + alias, + all_results + ) + + {# Sources #} + {% elif dataset == 'sources' %} + + ( + command_invocation_id, + node_id, + run_started_at, + database, + schema, + source_name, + loader, + name, + identifier, + loaded_at_field, + freshness, + all_results + ) + + {# Test Executions #} + {% elif dataset == 'test_executions' %} + + ( + command_invocation_id, + node_id, + run_started_at, + was_full_refresh, + thread_id, + status, + compile_started_at, + query_completed_at, + total_node_runtime, + rows_affected, + failures, + message, + adapter_response + ) + + {# Tests #} + {% elif dataset == 'tests' %} + + ( + command_invocation_id, + node_id, + run_started_at, + name, + depends_on_nodes, + package_name, + test_path, + tags, + all_results + ) + + {% else %} + + /* No column list available */ + + {% endif %} + +{%- endmacro %} diff --git a/macros/insert_into_metadata_table.sql b/macros/insert_into_metadata_table.sql index 235d2326..37994249 100644 --- a/macros/insert_into_metadata_table.sql +++ b/macros/insert_into_metadata_table.sql @@ -1,38 +1,44 @@ -{% macro insert_into_metadata_table(database_name, schema_name, table_name, content) -%} +{% macro insert_into_metadata_table(database_name, schema_name, table_name, fields, content) -%} + {% if content != "" %} - {{ return(adapter.dispatch('insert_into_metadata_table', 'dbt_artifacts')(database_name, schema_name, table_name, content)) }} + {{ return(adapter.dispatch('insert_into_metadata_table', 'dbt_artifacts')(database_name, schema_name, table_name, fields, content)) }} {% endif %} + {%- endmacro %} -{% macro spark__insert_into_metadata_table(database_name, schema_name, table_name, content) -%} +{% macro spark__insert_into_metadata_table(database_name, schema_name, table_name, fields, content) -%} + {% set insert_into_table_query %} - insert into {% if database_name %}{{ database_name }}.{% endif %}{{ schema_name }}.{{ table_name }} + insert into {% if database_name %}{{ database_name }}.{% endif %}{{ schema_name }}.{{ table_name }} {{ fields }} {{ content }} {% endset %} {% do run_query(insert_into_table_query) %} + {%- endmacro %} -{% macro snowflake__insert_into_metadata_table(database_name, schema_name, table_name, content) -%} +{% macro snowflake__insert_into_metadata_table(database_name, schema_name, table_name, fields, content) -%} + {% set insert_into_table_query %} - insert into {{database_name}}.{{ schema_name }}.{{ table_name }} + insert into {{database_name}}.{{ schema_name }}.{{ table_name }} {{ fields }} {{ content }} {% endset %} {% do run_query(insert_into_table_query) %} + {%- endmacro %} -{% macro bigquery__insert_into_metadata_table(database_name, schema_name, table_name, content) -%} +{% macro bigquery__insert_into_metadata_table(database_name, schema_name, table_name, fields, content) -%} - {% set insert_into_table_query %} - insert into `{{database_name}}.{{ schema_name }}.{{ table_name }}` - VALUES - {{ content }} - {% endset %} + {% set insert_into_table_query %} + insert into `{{database_name}}.{{ schema_name }}.{{ table_name }}` {{ fields }} + values + {{ content }} + {% endset %} - {% do run_query(insert_into_table_query) %} + {% do run_query(insert_into_table_query) %} {%- endmacro %} -{% macro default__insert_into_metadata_table(database_name, schema_name, table_name, content) -%} +{% macro default__insert_into_metadata_table(database_name, schema_name, table_name, fields, content) -%} {%- endmacro %} diff --git a/macros/upload_results.sql b/macros/upload_results.sql index dbfb8051..7a0f208d 100644 --- a/macros/upload_results.sql +++ b/macros/upload_results.sql @@ -22,10 +22,12 @@ {% do log("Uploading model executions", true) %} {% set model_executions = dbt_artifacts.get_relation('model_executions') %} {% set content_model_executions = dbt_artifacts.upload_model_executions(results) %} + {% set fields_model_executions = dbt_artifacts.get_column_name_list('model_executions') %} {{ dbt_artifacts.insert_into_metadata_table( database_name=model_executions.database, schema_name=model_executions.schema, table_name=model_executions.identifier, + fields=fields_model_executions, content=content_model_executions ) }} @@ -33,10 +35,12 @@ {% do log("Uploading seed executions", true) %} {% set seed_executions = dbt_artifacts.get_relation('seed_executions') %} {% set content_seed_executions = dbt_artifacts.upload_seed_executions(results) %} + {% set fields_seed_executions = dbt_artifacts.get_column_name_list('seed_executions') %} {{ dbt_artifacts.insert_into_metadata_table( database_name=seed_executions.database, schema_name=seed_executions.schema, table_name=seed_executions.identifier, + fields=fields_seed_executions, content=content_seed_executions ) }} @@ -44,10 +48,12 @@ {% do log("Uploading snapshot executions", true) %} {% set snapshot_executions = dbt_artifacts.get_relation('snapshot_executions') %} {% set content_snapshot_executions = dbt_artifacts.upload_snapshot_executions(results) %} + {% set fields_snapshot_executions = dbt_artifacts.get_column_name_list('snapshot_executions') %} {{ dbt_artifacts.insert_into_metadata_table( database_name=snapshot_executions.database, schema_name=snapshot_executions.schema, table_name=snapshot_executions.identifier, + fields=fields_snapshot_executions, content=content_snapshot_executions ) }} @@ -55,10 +61,12 @@ {% do log("Uploading test executions", true) %} {% set test_executions = dbt_artifacts.get_relation('test_executions') %} {% set content_test_executions = dbt_artifacts.upload_test_executions(results) %} + {% set fields_test_executions = dbt_artifacts.get_column_name_list('test_executions') %} {{ dbt_artifacts.insert_into_metadata_table( database_name=test_executions.database, schema_name=test_executions.schema, table_name=test_executions.identifier, + fields=fields_test_executions, content=content_test_executions ) }} @@ -68,10 +76,12 @@ {% do log("Uploading exposures", true) %} {% set exposures = dbt_artifacts.get_relation('exposures') %} {% set content_exposures = dbt_artifacts.upload_exposures(graph) %} + {% set fields_exposures = dbt_artifacts.get_column_name_list('exposures') %} {{ dbt_artifacts.insert_into_metadata_table( database_name=exposures.database, schema_name=exposures.schema, table_name=exposures.identifier, + fields=fields_exposures, content=content_exposures ) }} @@ -82,6 +92,7 @@ {% for node in graph.nodes.values() | selectattr("resource_type", "equalto", "test") %} {% do tests_set.append(node) %} {% endfor %} + {% set fields_tests = dbt_artifacts.get_column_name_list('tests') %} {# upload tests in chunks of 5000 tests (300 for BigQuery), or less #} {% set upload_limit = 300 if target.type == 'bigquery' else 5000 %} {% for i in range(0, tests_set | length, upload_limit) -%} @@ -90,6 +101,7 @@ database_name=tests.database, schema_name=tests.schema, table_name=tests.identifier, + fields=fields_tests, content=content_tests ) }} @@ -97,11 +109,13 @@ {% do log("Uploading seeds", true) %} {% set seeds = dbt_artifacts.get_relation('seeds') %} + {% set fields_seeds = dbt_artifacts.get_column_name_list('seeds') %} {% set content_seeds = dbt_artifacts.upload_seeds(graph) %} {{ dbt_artifacts.insert_into_metadata_table( database_name=seeds.database, schema_name=seeds.schema, table_name=seeds.identifier, + fields=fields_seeds, content=content_seeds ) }} @@ -112,6 +126,7 @@ {% for node in graph.nodes.values() | selectattr("resource_type", "equalto", "model") %} {% do models_set.append(node) %} {% endfor %} + {% set fields_models = dbt_artifacts.get_column_name_list('models') %} {% set upload_limit = 50 if target.type == 'bigquery' else 100 %} {% for i in range(0, models_set | length, upload_limit) -%} {% set content_models = dbt_artifacts.upload_models(models_set[i: i + upload_limit]) %} @@ -119,6 +134,7 @@ database_name=models.database, schema_name=models.schema, table_name=models.identifier, + fields=fields_models, content=content_models ) }} @@ -130,6 +146,7 @@ {% for node in graph.sources.values() %} {% do sources_set.append(node) %} {% endfor %} + {% set fields_sources = dbt_artifacts.get_column_name_list('sources') %} {# upload sources in chunks of 5000 sources (300 for BigQuery), or less #} {% set upload_limit = 300 if target.type == 'bigquery' else 5000 %} {% for i in range(0, sources_set | length, upload_limit) -%} @@ -138,6 +155,7 @@ database_name=sources.database, schema_name=sources.schema, table_name=sources.identifier, + fields=fields_sources, content=content_sources ) }} @@ -145,22 +163,26 @@ {% do log("Uploading snapshots", true) %} {% set snapshots = dbt_artifacts.get_relation('snapshots') %} + {% set fields_snapshots = dbt_artifacts.get_column_name_list('snapshots') %} {% set content_snapshots = dbt_artifacts.upload_snapshots(graph) %} {{ dbt_artifacts.insert_into_metadata_table( database_name=snapshots.database, schema_name=snapshots.schema, table_name=snapshots.identifier, + fields=fields_snapshots, content=content_snapshots ) }} {% do log("Uploading invocations", true) %} {% set invocations = dbt_artifacts.get_relation('invocations') %} + {% set fields_invocations = dbt_artifacts.get_column_name_list('invocations') %} {% set content_invocations = dbt_artifacts.upload_invocations() %} {{ dbt_artifacts.insert_into_metadata_table( database_name=invocations.database, schema_name=invocations.schema, table_name=invocations.identifier, + fields=fields_invocations, content=content_invocations ) }} From 9182fdcb281362d04831371636757846cef2c577 Mon Sep 17 00:00:00 2001 From: Gemma Down Date: Wed, 13 Sep 2023 12:13:44 +0100 Subject: [PATCH 02/15] Remove unused `rows_affected` column in `test_executions` #195 --- macros/get_column_name_lists.sql | 1 - macros/upload_test_executions.sql | 5 +---- models/fct_dbt__test_executions.sql | 1 - models/fct_dbt__test_executions.yml | 2 -- models/sources/test_executions.sql | 1 - models/sources/test_executions.yml | 2 -- models/staging/stg_dbt__test_executions.sql | 1 - models/staging/stg_dbt__test_executions.yml | 2 -- 8 files changed, 1 insertion(+), 14 deletions(-) diff --git a/macros/get_column_name_lists.sql b/macros/get_column_name_lists.sql index 050974cb..eb107e33 100644 --- a/macros/get_column_name_lists.sql +++ b/macros/get_column_name_lists.sql @@ -211,7 +211,6 @@ compile_started_at, query_completed_at, total_node_runtime, - rows_affected, failures, message, adapter_response diff --git a/macros/upload_test_executions.sql b/macros/upload_test_executions.sql index a8747099..a1aecbf5 100644 --- a/macros/upload_test_executions.sql +++ b/macros/upload_test_executions.sql @@ -23,8 +23,7 @@ {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(9) }}, {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(10) }}, {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(11) }}, - {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(12) }}, - {{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(13)) }} + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(12)) }} from values {% for test in tests -%} ( @@ -63,7 +62,6 @@ {% endif %} {{ test.execution_time }}, {# total_node_runtime #} - null, {# rows_affected not available in Databricks #} {{ 'null' if test.failures is none else test.failures }}, {# failures #} '{{ test.message | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}', {# message #} '{{ tojson(test.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}' {# adapter_response #} @@ -117,7 +115,6 @@ {% endif %} {{ test.execution_time }}, {# total_node_runtime #} - null, {# rows_affected not available in Databricks #} {{ 'null' if test.failures is none else test.failures }}, {# failures #} '{{ test.message | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') | replace("\n", "\\n") }}', {# message #} {{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(test.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"')) }} {# adapter_response #} diff --git a/models/fct_dbt__test_executions.sql b/models/fct_dbt__test_executions.sql index b921df43..5e227af5 100644 --- a/models/fct_dbt__test_executions.sql +++ b/models/fct_dbt__test_executions.sql @@ -18,7 +18,6 @@ test_executions as ( compile_started_at, query_completed_at, total_node_runtime, - rows_affected, failures, message from base diff --git a/models/fct_dbt__test_executions.yml b/models/fct_dbt__test_executions.yml index 35cae002..6541a7ff 100644 --- a/models/fct_dbt__test_executions.yml +++ b/models/fct_dbt__test_executions.yml @@ -14,8 +14,6 @@ models: description: '{{ doc("node_id") }}' - name: query_completed_at description: '{{ doc("query_completed_at") }}' - - name: rows_affected - description: '{{ doc("rows_affected") }}' - name: status description: '{{ doc("status") }}' - name: test_execution_id diff --git a/models/sources/test_executions.sql b/models/sources/test_executions.sql index 50d37980..4f0e19f7 100644 --- a/models/sources/test_executions.sql +++ b/models/sources/test_executions.sql @@ -13,7 +13,6 @@ select cast(null as {{ type_timestamp() }}) as compile_started_at, cast(null as {{ type_timestamp() }}) as query_completed_at, cast(null as {{ type_float() }}) as total_node_runtime, - cast(null as {{ type_int() }}) as rows_affected, cast(null as {{ type_int() }}) as failures, cast(null as {{ type_string() }}) as message, cast(null as {{ type_json() }}) as adapter_response diff --git a/models/sources/test_executions.yml b/models/sources/test_executions.yml index e851c7c5..4449a37d 100644 --- a/models/sources/test_executions.yml +++ b/models/sources/test_executions.yml @@ -22,8 +22,6 @@ models: description: '{{ doc("query_completed_at") }}' - name: total_node_runtime description: '{{ doc("total_node_runtime") }}' - - name: rows_affected - description: '{{ doc("rows_affected") }}' - name: failures description: '{{ doc("failures") }}' - name: message diff --git a/models/staging/stg_dbt__test_executions.sql b/models/staging/stg_dbt__test_executions.sql index ca7ea868..9ace3de5 100644 --- a/models/staging/stg_dbt__test_executions.sql +++ b/models/staging/stg_dbt__test_executions.sql @@ -18,7 +18,6 @@ enhanced as ( compile_started_at, query_completed_at, total_node_runtime, - rows_affected, failures, message from base diff --git a/models/staging/stg_dbt__test_executions.yml b/models/staging/stg_dbt__test_executions.yml index 93d62fe4..aab7cfd2 100644 --- a/models/staging/stg_dbt__test_executions.yml +++ b/models/staging/stg_dbt__test_executions.yml @@ -14,8 +14,6 @@ models: description: '{{ doc("node_id") }}' - name: query_completed_at description: '{{ doc("query_completed_at") }}' - - name: rows_affected - description: '{{ doc("rows_affected") }}' - name: run_started_at description: '{{ doc("run_started_at") }}' - name: status From f691014eda3b2cd71fb4aac85fbd7961ea2555bc Mon Sep 17 00:00:00 2001 From: Gemma Down Date: Wed, 13 Sep 2023 12:18:19 +0100 Subject: [PATCH 03/15] Reorganise macro folder --- .../column_identifier.sql | 0 .../generate_surrogate_key.sql | 0 macros/database_specific_helpers/get_relation.sql | 14 ++++++++++++++ .../{ => database_specific_helpers}/parse_json.sql | 0 .../type_helpers.sql | 8 ++++---- macros/{ => migration}/migrate_from_v0_to_v1.sql | 0 .../upload_exposures.sql | 0 .../upload_invocations.sql | 0 .../upload_model_executions.sql | 0 .../upload_models.sql | 0 .../upload_seed_executions.sql | 0 .../upload_seeds.sql | 0 .../upload_snapshot_executions.sql | 0 .../upload_snapshots.sql | 0 .../upload_sources.sql | 0 .../upload_test_executions.sql | 0 .../upload_tests.sql | 0 .../{ => upload_results}/get_column_name_lists.sql | 0 .../insert_into_metadata_table.sql | 0 macros/{ => upload_results}/upload_results.sql | 14 -------------- 20 files changed, 18 insertions(+), 18 deletions(-) rename macros/{ => database_specific_helpers}/column_identifier.sql (100%) rename macros/{ => database_specific_helpers}/generate_surrogate_key.sql (100%) create mode 100644 macros/database_specific_helpers/get_relation.sql rename macros/{ => database_specific_helpers}/parse_json.sql (100%) rename macros/{ => database_specific_helpers}/type_helpers.sql (95%) rename macros/{ => migration}/migrate_from_v0_to_v1.sql (100%) rename macros/{ => upload_individual_datasets}/upload_exposures.sql (100%) rename macros/{ => upload_individual_datasets}/upload_invocations.sql (100%) rename macros/{ => upload_individual_datasets}/upload_model_executions.sql (100%) rename macros/{ => upload_individual_datasets}/upload_models.sql (100%) rename macros/{ => upload_individual_datasets}/upload_seed_executions.sql (100%) rename macros/{ => upload_individual_datasets}/upload_seeds.sql (100%) rename macros/{ => upload_individual_datasets}/upload_snapshot_executions.sql (100%) rename macros/{ => upload_individual_datasets}/upload_snapshots.sql (100%) rename macros/{ => upload_individual_datasets}/upload_sources.sql (100%) rename macros/{ => upload_individual_datasets}/upload_test_executions.sql (100%) rename macros/{ => upload_individual_datasets}/upload_tests.sql (100%) rename macros/{ => upload_results}/get_column_name_lists.sql (100%) rename macros/{ => upload_results}/insert_into_metadata_table.sql (100%) rename macros/{ => upload_results}/upload_results.sql (93%) diff --git a/macros/column_identifier.sql b/macros/database_specific_helpers/column_identifier.sql similarity index 100% rename from macros/column_identifier.sql rename to macros/database_specific_helpers/column_identifier.sql diff --git a/macros/generate_surrogate_key.sql b/macros/database_specific_helpers/generate_surrogate_key.sql similarity index 100% rename from macros/generate_surrogate_key.sql rename to macros/database_specific_helpers/generate_surrogate_key.sql diff --git a/macros/database_specific_helpers/get_relation.sql b/macros/database_specific_helpers/get_relation.sql new file mode 100644 index 00000000..5954032d --- /dev/null +++ b/macros/database_specific_helpers/get_relation.sql @@ -0,0 +1,14 @@ +{% macro get_relation(get_relation_name) %} + {% if execute %} + {% set model_get_relation_node = graph.nodes.values() | selectattr('name', 'equalto', get_relation_name) | first %} + {% set relation = api.Relation.create( + database = model_get_relation_node.database, + schema = model_get_relation_node.schema, + identifier = model_get_relation_node.alias + ) + %} + {% do return(relation) %} + {% else %} + {% do return(api.Relation.create()) %} + {% endif %} +{% endmacro %} diff --git a/macros/parse_json.sql b/macros/database_specific_helpers/parse_json.sql similarity index 100% rename from macros/parse_json.sql rename to macros/database_specific_helpers/parse_json.sql diff --git a/macros/type_helpers.sql b/macros/database_specific_helpers/type_helpers.sql similarity index 95% rename from macros/type_helpers.sql rename to macros/database_specific_helpers/type_helpers.sql index 19c3a718..4064ad46 100644 --- a/macros/type_helpers.sql +++ b/macros/database_specific_helpers/type_helpers.sql @@ -19,11 +19,11 @@ {% endmacro %} {% macro snowflake__type_json() %} - OBJECT + object {% endmacro %} {% macro bigquery__type_json() %} - JSON + json {% endmacro %} {#- ARRAY -#} @@ -37,9 +37,9 @@ {% endmacro %} {% macro snowflake__type_array() %} - ARRAY + array {% endmacro %} {% macro bigquery__type_array() %} - ARRAY + array {% endmacro %} diff --git a/macros/migrate_from_v0_to_v1.sql b/macros/migration/migrate_from_v0_to_v1.sql similarity index 100% rename from macros/migrate_from_v0_to_v1.sql rename to macros/migration/migrate_from_v0_to_v1.sql diff --git a/macros/upload_exposures.sql b/macros/upload_individual_datasets/upload_exposures.sql similarity index 100% rename from macros/upload_exposures.sql rename to macros/upload_individual_datasets/upload_exposures.sql diff --git a/macros/upload_invocations.sql b/macros/upload_individual_datasets/upload_invocations.sql similarity index 100% rename from macros/upload_invocations.sql rename to macros/upload_individual_datasets/upload_invocations.sql diff --git a/macros/upload_model_executions.sql b/macros/upload_individual_datasets/upload_model_executions.sql similarity index 100% rename from macros/upload_model_executions.sql rename to macros/upload_individual_datasets/upload_model_executions.sql diff --git a/macros/upload_models.sql b/macros/upload_individual_datasets/upload_models.sql similarity index 100% rename from macros/upload_models.sql rename to macros/upload_individual_datasets/upload_models.sql diff --git a/macros/upload_seed_executions.sql b/macros/upload_individual_datasets/upload_seed_executions.sql similarity index 100% rename from macros/upload_seed_executions.sql rename to macros/upload_individual_datasets/upload_seed_executions.sql diff --git a/macros/upload_seeds.sql b/macros/upload_individual_datasets/upload_seeds.sql similarity index 100% rename from macros/upload_seeds.sql rename to macros/upload_individual_datasets/upload_seeds.sql diff --git a/macros/upload_snapshot_executions.sql b/macros/upload_individual_datasets/upload_snapshot_executions.sql similarity index 100% rename from macros/upload_snapshot_executions.sql rename to macros/upload_individual_datasets/upload_snapshot_executions.sql diff --git a/macros/upload_snapshots.sql b/macros/upload_individual_datasets/upload_snapshots.sql similarity index 100% rename from macros/upload_snapshots.sql rename to macros/upload_individual_datasets/upload_snapshots.sql diff --git a/macros/upload_sources.sql b/macros/upload_individual_datasets/upload_sources.sql similarity index 100% rename from macros/upload_sources.sql rename to macros/upload_individual_datasets/upload_sources.sql diff --git a/macros/upload_test_executions.sql b/macros/upload_individual_datasets/upload_test_executions.sql similarity index 100% rename from macros/upload_test_executions.sql rename to macros/upload_individual_datasets/upload_test_executions.sql diff --git a/macros/upload_tests.sql b/macros/upload_individual_datasets/upload_tests.sql similarity index 100% rename from macros/upload_tests.sql rename to macros/upload_individual_datasets/upload_tests.sql diff --git a/macros/get_column_name_lists.sql b/macros/upload_results/get_column_name_lists.sql similarity index 100% rename from macros/get_column_name_lists.sql rename to macros/upload_results/get_column_name_lists.sql diff --git a/macros/insert_into_metadata_table.sql b/macros/upload_results/insert_into_metadata_table.sql similarity index 100% rename from macros/insert_into_metadata_table.sql rename to macros/upload_results/insert_into_metadata_table.sql diff --git a/macros/upload_results.sql b/macros/upload_results/upload_results.sql similarity index 93% rename from macros/upload_results.sql rename to macros/upload_results/upload_results.sql index 7a0f208d..8b07ec5d 100644 --- a/macros/upload_results.sql +++ b/macros/upload_results/upload_results.sql @@ -1,18 +1,4 @@ {# dbt doesn't like us ref'ing in an operation so we fetch the info from the graph #} -{% macro get_relation(get_relation_name) %} - {% if execute %} - {% set model_get_relation_node = graph.nodes.values() | selectattr('name', 'equalto', get_relation_name) | first %} - {% set relation = api.Relation.create( - database = model_get_relation_node.database, - schema = model_get_relation_node.schema, - identifier = model_get_relation_node.alias - ) - %} - {% do return(relation) %} - {% else %} - {% do return(api.Relation.create()) %} - {% endif %} -{% endmacro %} {% macro upload_results(results) -%} From 305879d7fbadf811fca10f14479dafe234842120 Mon Sep 17 00:00:00 2001 From: Gemma Down Date: Wed, 13 Sep 2023 12:47:46 +0100 Subject: [PATCH 04/15] Add documentation for macros --- macros/_macros.yml | 224 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 224 insertions(+) create mode 100644 macros/_macros.yml diff --git a/macros/_macros.yml b/macros/_macros.yml new file mode 100644 index 00000000..70921d63 --- /dev/null +++ b/macros/_macros.yml @@ -0,0 +1,224 @@ +version: 2 + +macros: + ## DATABASE SPECIFIC HELPERS ## + - name: column_identifier + description: | + Dependent on the adapter type, return the identifier for a column using a numerical index. + arguments: + - name: column_index + type: integer + description: | + The index of the column to return the identifier for + + - name: generate_surrogate_key + description: | + Since folks commonly install dbt_artifacts alongside a myriad of other packages, + we copy the dbt_utils implementation of the surrogate_key macro so we don't have + any dependencies to make conflicts worse! + + This version is: + URL: https://github.com/dbt-labs/dbt-utils/blob/main/macros/sql/generate_surrogate_key.sql + Commit SHA: eaa0e41b033bdf252eff0ae014ec11888f37ebff + Date: 2023-04-28 + arguments: + - name: field_list + type: list + description: | + A list of fields to concatenate together to form the surrogate key + + - name: get_relation + description: | + Identify a relation in the graph from a relation name + arguments: + - name: get_relation_name + type: string + description: | + The name of the relation to return from the graph + + - name: parse_json + description: | + Dependent on the adapter type, return a column which parses the JSON field. + arguments: + - name: field + type: string + description: | + The name of the field to parse + + - name: type_array + description: | + Dependent on the adapter type, returns the native type for storing an array. + + - name: type_boolean + description: | + Dependent on the adapter type, returns the native boolean type. + + - name: type_json + description: | + Dependent on the adapter type, returns the native type for storing JSON. + + ## MIGRATION ## + - name: migrate_from_v0_to_v1 + description: | + A macro to assist with migrating from v0 to v1 of dbt_artifacts. See + https://github.com/brooklyn-data/dbt_artifacts/blob/main/README.md#migrating-from-100-to-100 + for details on the usage. + arguments: + - name: old_database + type: string + description: | + The database of the <1.0.0 output (fct_/dim_) models - does not have to be different to `new_database` + - name: old_schema + type: string + description: | + The schema of the <1.0.0 output (fct_/dim_) models - does not have to be different to `new_schema` + - name: new_database + type: string + description: | + The target database that the v1 artifact sources are in - does not have to be different to `old_database` + - name: new_schema + type: string + description: | + The target schema that the v1 artifact sources are in - does not have to be different to `old_schema` + + ## UPLOAD INDIVIDUAL DATASETS ## + - name: upload_exposures + description: | + The macro to support upload of the data to the exposures table. + arguments: + - name: graph + type: object + description: | + The graph object from dbt. + + - name: upload_invocations + description: | + The macro to support upload of the data to the invocations table. + + - name: upload_model_executions + description: | + The macro to support upload of the data to the model_executions table. + arguments: + - name: results + type: list + description: | + The results object from dbt. + + - name: upload_models + description: | + The macro to support upload of the data to the models table. + arguments: + - name: models + type: list + description: | + A list of test objects extracted from the dbt graph + + - name: upload_seed_executions + description: | + The macro to support upload of the data to the seed_executions table. + arguments: + - name: results + type: list + description: | + The results object from dbt. + + - name: upload_seeds + description: | + The macro to support upload of the data to the seeds table. + arguments: + - name: graph + type: object + description: | + The graph object from dbt. + + - name: upload_snapshot_executions + description: | + The macro to support upload of the data to the snapshot_executions table. + arguments: + - name: results + type: list + description: | + The results object from dbt. + + - name: upload_snapshots + description: | + The macro to support upload of the data to the snapshots table. + arguments: + - name: graph + type: object + description: | + The graph object from dbt. + + - name: upload_sources + description: | + The macro to support upload of the data to the sources table. + arguments: + - name: sources + type: list + description: | + A list of sources objects extracted from the dbt graph + + - name: upload_test_executions + description: | + The macro to support upload of the data to the test_executions table. + arguments: + - name: results + type: list + description: | + The results object from dbt. + + - name: upload_tests + description: | + The macro to support upload of the data to the tests table. + arguments: + - name: tests + type: list + description: | + A list of test objects extracted from the dbt graph + + ## UPLOAD RESULTS ## + - name: get_column_name_lists + description: | + A macro to return the list of column names for a particular dataset. Returns a comment if the dataset is not + valid. + arguments: + - name: dataset + type: string + description: | + The name of the dataset to return the column names for e.g. `models` + + - name: insert_into_metadata_table + description: | + Dependent on the adapter type, the wrapper to insert the data into a table from a list of values. Used in the + `upload_results` macro, alongside the `get_column_lists` macro to generate the column names and the + `upload_dataset` macros to generate the data to be inserted. + arguments: + - name: database_name + type: string + description: | + The database name for the relation that the data is to be inserted into + - name: schema_name + type: string + description: | + The schema name for the relation that the data is to be inserted into + - name: table_name + type: string + description: | + The table name for the relation that the data is to be inserted into + - name: fields + type: string + description: | + The list of fields for the relation that the data is to be inserted into + - name: content + type: string + description: | + The data content to insert into the relation + + - name: upload_results + description: | + The main macro called to upload the metadata into each of the source tables. + arguments: + - name: results + type: list + description: | + The results object from dbt. From beb057a92e1147eedbfd27fe8bece71964165d79 Mon Sep 17 00:00:00 2001 From: Gemma Down Date: Wed, 13 Sep 2023 14:12:22 +0100 Subject: [PATCH 05/15] Consolidate logic in upload results --- macros/upload_results/upload_results.sql | 188 +++++++++-------------- 1 file changed, 72 insertions(+), 116 deletions(-) diff --git a/macros/upload_results/upload_results.sql b/macros/upload_results/upload_results.sql index 8b07ec5d..f4374d67 100644 --- a/macros/upload_results/upload_results.sql +++ b/macros/upload_results/upload_results.sql @@ -4,73 +4,88 @@ {% if execute %} + {% set standard_datasets = ['exposures', 'seeds', 'snapshots', 'invocations'] %} {% if results != [] %} - {% do log("Uploading model executions", true) %} - {% set model_executions = dbt_artifacts.get_relation('model_executions') %} - {% set content_model_executions = dbt_artifacts.upload_model_executions(results) %} - {% set fields_model_executions = dbt_artifacts.get_column_name_list('model_executions') %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=model_executions.database, - schema_name=model_executions.schema, - table_name=model_executions.identifier, - fields=fields_model_executions, - content=content_model_executions - ) - }} + {# When executing, and results are available, then upload the results #} + {% set standard_datasets = ['model_executions', 'seed_executions', 'test_executions', 'snapshot_executions'] + standard_datasets %} + {% endif %} - {% do log("Uploading seed executions", true) %} - {% set seed_executions = dbt_artifacts.get_relation('seed_executions') %} - {% set content_seed_executions = dbt_artifacts.upload_seed_executions(results) %} - {% set fields_seed_executions = dbt_artifacts.get_column_name_list('seed_executions') %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=seed_executions.database, - schema_name=seed_executions.schema, - table_name=seed_executions.identifier, - fields=fields_seed_executions, - content=content_seed_executions - ) - }} + {# Upload each data set in turn #} + {% for dataset in standard_datasets %} + + {% do log("Uploading " ~ dataset.replace("_", ""), true) %} + {% set dataset_relation = dbt_artifacts.get_relation(dataset) %} - {% do log("Uploading snapshot executions", true) %} - {% set snapshot_executions = dbt_artifacts.get_relation('snapshot_executions') %} - {% set content_snapshot_executions = dbt_artifacts.upload_snapshot_executions(results) %} - {% set fields_snapshot_executions = dbt_artifacts.get_column_name_list('snapshot_executions') %} + {% set content %} + + {# Executions make use of the results object #} + {% if dataset == 'model_executions' %} + {{ dbt_artifacts.upload_model_executions(results) }} + {% elif dataset == 'seed_executions' %} + {{ dbt_artifacts.upload_seed_executions(results) }} + {% elif dataset == 'test_executions' %} + {{ dbt_artifacts.upload_test_executions(results) }} + {% elif dataset == 'snapshot_executions' %} + {{ dbt_artifacts.upload_snapshot_executions(results) }} + {# + Use the [graph](https://docs.getdbt.com/reference/dbt-jinja-functions/graph) to extract details about + the exposures, seeds, snapshots and invocations + #} + {% elif dataset == 'exposures' %} + {{ dbt_artifacts.upload_exposures(graph) }} + {% elif dataset == 'seeds' %} + {{ dbt_artifacts.upload_seeds(graph) }} + {% elif dataset == 'snapshots' %} + {{ dbt_artifacts.upload_snapshots(graph) }} + {# Invocations only requires data from variables available in the macro #} + {% elif dataset == 'invocations' %} + {{ dbt_artifacts.upload_invocations() }} + {% endif %} + + {% endset %} + + {# Insert the content into the metadata table #} {{ dbt_artifacts.insert_into_metadata_table( - database_name=snapshot_executions.database, - schema_name=snapshot_executions.schema, - table_name=snapshot_executions.identifier, - fields=fields_snapshot_executions, - content=content_snapshot_executions + database_name=dataset_relation.database, + schema_name=dataset_relation.schema, + table_name=dataset_relation.identifier, + fields=dbt_artifacts.get_column_name_list(dataset), + content=content ) }} - {% do log("Uploading test executions", true) %} - {% set test_executions = dbt_artifacts.get_relation('test_executions') %} - {% set content_test_executions = dbt_artifacts.upload_test_executions(results) %} - {% set fields_test_executions = dbt_artifacts.get_column_name_list('test_executions') %} + {% endfor %} + + + {# + We can also use a similar approach for sources, but we want to reduce the number uploaded each time + #} + + {% do log("Uploading sources", true) %} + {% set sources = dbt_artifacts.get_relation('sources') %} + {% set sources_set = [] %} + {% for node in graph.sources.values() %} + {% do sources_set.append(node) %} + {% endfor %} + {% set fields_sources = dbt_artifacts.get_column_name_list('sources') %} + {# upload sources in chunks of 5000 sources (300 for BigQuery), or less #} + {% set upload_limit = 300 if target.type == 'bigquery' else 5000 %} + {% for i in range(0, sources_set | length, upload_limit) -%} + {% set content_sources = dbt_artifacts.upload_sources(sources_set[i: i + upload_limit]) %} {{ dbt_artifacts.insert_into_metadata_table( - database_name=test_executions.database, - schema_name=test_executions.schema, - table_name=test_executions.identifier, - fields=fields_test_executions, - content=content_test_executions + database_name=sources.database, + schema_name=sources.schema, + table_name=sources.identifier, + fields=fields_sources, + content=content_sources ) }} + {%- endfor %} - {% endif %} - - {% do log("Uploading exposures", true) %} - {% set exposures = dbt_artifacts.get_relation('exposures') %} - {% set content_exposures = dbt_artifacts.upload_exposures(graph) %} - {% set fields_exposures = dbt_artifacts.get_column_name_list('exposures') %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=exposures.database, - schema_name=exposures.schema, - table_name=exposures.identifier, - fields=fields_exposures, - content=content_exposures - ) - }} + {# + Use the [graph](https://docs.getdbt.com/reference/dbt-jinja-functions/graph) to extract details about + the tests, models and sources - need to look through the nodes and select the ones we want + #} {% do log("Uploading tests", true) %} {% set tests = dbt_artifacts.get_relation('tests') %} @@ -93,19 +108,6 @@ }} {%- endfor %} - {% do log("Uploading seeds", true) %} - {% set seeds = dbt_artifacts.get_relation('seeds') %} - {% set fields_seeds = dbt_artifacts.get_column_name_list('seeds') %} - {% set content_seeds = dbt_artifacts.upload_seeds(graph) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=seeds.database, - schema_name=seeds.schema, - table_name=seeds.identifier, - fields=fields_seeds, - content=content_seeds - ) - }} - {% do log("Uploading models", true) %} {% set models = dbt_artifacts.get_relation('models') %} {% set models_set = [] %} @@ -113,6 +115,7 @@ {% do models_set.append(node) %} {% endfor %} {% set fields_models = dbt_artifacts.get_column_name_list('models') %} + {# upload tests in chunks of 100 models (50 for BigQuery), or less #} {% set upload_limit = 50 if target.type == 'bigquery' else 100 %} {% for i in range(0, models_set | length, upload_limit) -%} {% set content_models = dbt_artifacts.upload_models(models_set[i: i + upload_limit]) %} @@ -126,52 +129,5 @@ }} {%- endfor %} - {% do log("Uploading sources", true) %} - {% set sources = dbt_artifacts.get_relation('sources') %} - {% set sources_set = [] %} - {% for node in graph.sources.values() %} - {% do sources_set.append(node) %} - {% endfor %} - {% set fields_sources = dbt_artifacts.get_column_name_list('sources') %} - {# upload sources in chunks of 5000 sources (300 for BigQuery), or less #} - {% set upload_limit = 300 if target.type == 'bigquery' else 5000 %} - {% for i in range(0, sources_set | length, upload_limit) -%} - {% set content_sources = dbt_artifacts.upload_sources(sources_set[i: i + upload_limit]) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=sources.database, - schema_name=sources.schema, - table_name=sources.identifier, - fields=fields_sources, - content=content_sources - ) - }} - {%- endfor %} - - {% do log("Uploading snapshots", true) %} - {% set snapshots = dbt_artifacts.get_relation('snapshots') %} - {% set fields_snapshots = dbt_artifacts.get_column_name_list('snapshots') %} - {% set content_snapshots = dbt_artifacts.upload_snapshots(graph) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=snapshots.database, - schema_name=snapshots.schema, - table_name=snapshots.identifier, - fields=fields_snapshots, - content=content_snapshots - ) - }} - - {% do log("Uploading invocations", true) %} - {% set invocations = dbt_artifacts.get_relation('invocations') %} - {% set fields_invocations = dbt_artifacts.get_column_name_list('invocations') %} - {% set content_invocations = dbt_artifacts.upload_invocations() %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=invocations.database, - schema_name=invocations.schema, - table_name=invocations.identifier, - fields=fields_invocations, - content=content_invocations - ) - }} - {% endif %} {%- endmacro %} From 720343910331556b85dfd6cadb6329c71bc7f080 Mon Sep 17 00:00:00 2001 From: Gemma Down Date: Wed, 13 Sep 2023 14:31:56 +0100 Subject: [PATCH 06/15] Simplify how nodes are extracted --- macros/_macros.yml | 36 +++++++++---------- .../upload_exposures.sql | 6 +--- .../upload_model_executions.sql | 8 +---- .../upload_seed_executions.sql | 8 +---- .../upload_seeds.sql | 6 +--- .../upload_snapshot_executions.sql | 8 +---- .../upload_snapshots.sql | 7 ++-- .../upload_test_executions.sql | 8 +---- macros/upload_results/upload_results.sql | 29 ++++++--------- 9 files changed, 36 insertions(+), 80 deletions(-) diff --git a/macros/_macros.yml b/macros/_macros.yml index 70921d63..6ef273aa 100644 --- a/macros/_macros.yml +++ b/macros/_macros.yml @@ -86,10 +86,10 @@ macros: description: | The macro to support upload of the data to the exposures table. arguments: - - name: graph - type: object + - name: exposures + type: list description: | - The graph object from dbt. + A list of exposure objects extracted from the dbt graph - name: upload_invocations description: | @@ -99,10 +99,10 @@ macros: description: | The macro to support upload of the data to the model_executions table. arguments: - - name: results + - name: models type: list description: | - The results object from dbt. + A list of model execution results objects extracted from the dbt result object - name: upload_models description: | @@ -117,37 +117,37 @@ macros: description: | The macro to support upload of the data to the seed_executions table. arguments: - - name: results + - name: seeds type: list description: | - The results object from dbt. + A list of seed execution results objects extracted from the dbt result object - name: upload_seeds description: | The macro to support upload of the data to the seeds table. arguments: - - name: graph - type: object + - name: seeds + type: list description: | - The graph object from dbt. + A list of seeds objects extracted from the dbt graph - name: upload_snapshot_executions description: | The macro to support upload of the data to the snapshot_executions table. arguments: - - name: results + - name: snapshots type: list description: | - The results object from dbt. + A list of snapshot execution results objects extracted from the dbt result object - name: upload_snapshots description: | The macro to support upload of the data to the snapshots table. arguments: - - name: graph - type: object + - name: snapshots + type: list description: | - The graph object from dbt. + A list of snapshots objects extracted from the dbt graph - name: upload_sources description: | @@ -162,10 +162,10 @@ macros: description: | The macro to support upload of the data to the test_executions table. arguments: - - name: results + - name: tests type: list description: | - The results object from dbt. + A list of test execution results objects extracted from the dbt result object - name: upload_tests description: | @@ -177,7 +177,7 @@ macros: A list of test objects extracted from the dbt graph ## UPLOAD RESULTS ## - - name: get_column_name_lists + - name: get_column_name_list description: | A macro to return the list of column names for a particular dataset. Returns a comment if the dataset is not valid. diff --git a/macros/upload_individual_datasets/upload_exposures.sql b/macros/upload_individual_datasets/upload_exposures.sql index d8653154..4f3b51d1 100644 --- a/macros/upload_individual_datasets/upload_exposures.sql +++ b/macros/upload_individual_datasets/upload_exposures.sql @@ -1,8 +1,4 @@ -{% macro upload_exposures(graph) -%} - {% set exposures = [] %} - {% for node in graph.exposures.values() %} - {% do exposures.append(node) %} - {% endfor %} +{% macro upload_exposures(exposures) -%} {{ return(adapter.dispatch('get_exposures_dml_sql', 'dbt_artifacts')(exposures)) }} {%- endmacro %} diff --git a/macros/upload_individual_datasets/upload_model_executions.sql b/macros/upload_individual_datasets/upload_model_executions.sql index a8ebbe89..3ff87cc5 100644 --- a/macros/upload_individual_datasets/upload_model_executions.sql +++ b/macros/upload_individual_datasets/upload_model_executions.sql @@ -1,10 +1,4 @@ -{% macro upload_model_executions(results) -%} - {% set models = [] %} - {% for result in results %} - {% if result.node.resource_type == "model" %} - {% do models.append(result) %} - {% endif %} - {% endfor %} +{% macro upload_model_executions(models) -%} {{ return(adapter.dispatch('get_model_executions_dml_sql', 'dbt_artifacts')(models)) }} {%- endmacro %} diff --git a/macros/upload_individual_datasets/upload_seed_executions.sql b/macros/upload_individual_datasets/upload_seed_executions.sql index 4be58b09..69c4540c 100644 --- a/macros/upload_individual_datasets/upload_seed_executions.sql +++ b/macros/upload_individual_datasets/upload_seed_executions.sql @@ -1,10 +1,4 @@ -{% macro upload_seed_executions(results) -%} - {% set seeds = [] %} - {% for result in results %} - {% if result.node.resource_type == "seed" %} - {% do seeds.append(result) %} - {% endif %} - {% endfor %} +{% macro upload_seed_executions(seeds) -%} {{ return(adapter.dispatch('get_seed_executions_dml_sql', 'dbt_artifacts')(seeds)) }} {%- endmacro %} diff --git a/macros/upload_individual_datasets/upload_seeds.sql b/macros/upload_individual_datasets/upload_seeds.sql index 8158bf84..f9d3ac06 100644 --- a/macros/upload_individual_datasets/upload_seeds.sql +++ b/macros/upload_individual_datasets/upload_seeds.sql @@ -1,8 +1,4 @@ -{% macro upload_seeds(graph) -%} - {% set seeds = [] %} - {% for node in graph.nodes.values() | selectattr("resource_type", "equalto", "seed") %} - {% do seeds.append(node) %} - {% endfor %} +{% macro upload_seeds(seeds) -%} {{ return(adapter.dispatch('get_seeds_dml_sql', 'dbt_artifacts')(seeds)) }} {%- endmacro %} diff --git a/macros/upload_individual_datasets/upload_snapshot_executions.sql b/macros/upload_individual_datasets/upload_snapshot_executions.sql index 3e8c03d0..a9980b18 100644 --- a/macros/upload_individual_datasets/upload_snapshot_executions.sql +++ b/macros/upload_individual_datasets/upload_snapshot_executions.sql @@ -1,10 +1,4 @@ -{% macro upload_snapshot_executions(results) -%} - {% set snapshots = [] %} - {% for result in results %} - {% if result.node.resource_type == "snapshot" %} - {% do snapshots.append(result) %} - {% endif %} - {% endfor %} +{% macro upload_snapshot_executions(snapshots) -%} {{ return(adapter.dispatch('get_snapshot_executions_dml_sql', 'dbt_artifacts')(snapshots)) }} {%- endmacro %} diff --git a/macros/upload_individual_datasets/upload_snapshots.sql b/macros/upload_individual_datasets/upload_snapshots.sql index 477f46a6..ce45aa19 100644 --- a/macros/upload_individual_datasets/upload_snapshots.sql +++ b/macros/upload_individual_datasets/upload_snapshots.sql @@ -1,8 +1,5 @@ -{% macro upload_snapshots(graph) -%} - {% set snapshots = [] %} - {% for node in graph.nodes.values() | selectattr("resource_type", "equalto", "snapshot") %} - {% do snapshots.append(node) %} - {% endfor %} +{% macro upload_snapshots(snapshots) -%} + {{ return(adapter.dispatch('get_snapshots_dml_sql', 'dbt_artifacts')(snapshots)) }} {%- endmacro %} diff --git a/macros/upload_individual_datasets/upload_test_executions.sql b/macros/upload_individual_datasets/upload_test_executions.sql index a1aecbf5..84ef217b 100644 --- a/macros/upload_individual_datasets/upload_test_executions.sql +++ b/macros/upload_individual_datasets/upload_test_executions.sql @@ -1,10 +1,4 @@ -{% macro upload_test_executions(results) -%} - {% set tests = [] %} - {% for result in results %} - {% if result.node.resource_type == "test" %} - {% do tests.append(result) %} - {% endif %} - {% endfor %} +{% macro upload_test_executions(tests) -%} {{ return(adapter.dispatch('get_test_executions_dml_sql', 'dbt_artifacts')(tests)) }} {%- endmacro %} diff --git a/macros/upload_results/upload_results.sql b/macros/upload_results/upload_results.sql index f4374d67..4fd81ef0 100644 --- a/macros/upload_results/upload_results.sql +++ b/macros/upload_results/upload_results.sql @@ -20,23 +20,23 @@ {# Executions make use of the results object #} {% if dataset == 'model_executions' %} - {{ dbt_artifacts.upload_model_executions(results) }} + {{ dbt_artifacts.upload_model_executions(results | selectattr("node.resource_type", "equalto", "model") | list) }} {% elif dataset == 'seed_executions' %} - {{ dbt_artifacts.upload_seed_executions(results) }} + {{ dbt_artifacts.upload_seed_executions(results | selectattr("node.resource_type", "equalto", "seed") | list) }} {% elif dataset == 'test_executions' %} - {{ dbt_artifacts.upload_test_executions(results) }} + {{ dbt_artifacts.upload_test_executions(results | selectattr("node.resource_type", "equalto", "test") | list) }} {% elif dataset == 'snapshot_executions' %} - {{ dbt_artifacts.upload_snapshot_executions(results) }} + {{ dbt_artifacts.upload_snapshot_executions(results | selectattr("node.resource_type", "equalto", "snapshot") | list) }} {# Use the [graph](https://docs.getdbt.com/reference/dbt-jinja-functions/graph) to extract details about the exposures, seeds, snapshots and invocations #} {% elif dataset == 'exposures' %} - {{ dbt_artifacts.upload_exposures(graph) }} + {{ dbt_artifacts.upload_exposures(graph.exposures.values() | list) }} {% elif dataset == 'seeds' %} - {{ dbt_artifacts.upload_seeds(graph) }} + {{ dbt_artifacts.upload_seeds(graph.nodes.values() | selectattr("resource_type", "equalto", "seed") | list) }} {% elif dataset == 'snapshots' %} - {{ dbt_artifacts.upload_snapshots(graph) }} + {{ dbt_artifacts.upload_snapshots(graph.nodes.values() | selectattr("resource_type", "equalto", "snapshot") | list) }} {# Invocations only requires data from variables available in the macro #} {% elif dataset == 'invocations' %} {{ dbt_artifacts.upload_invocations() }} @@ -63,10 +63,7 @@ {% do log("Uploading sources", true) %} {% set sources = dbt_artifacts.get_relation('sources') %} - {% set sources_set = [] %} - {% for node in graph.sources.values() %} - {% do sources_set.append(node) %} - {% endfor %} + {% set sources_set = graph.sources.values() | list %} {% set fields_sources = dbt_artifacts.get_column_name_list('sources') %} {# upload sources in chunks of 5000 sources (300 for BigQuery), or less #} {% set upload_limit = 300 if target.type == 'bigquery' else 5000 %} @@ -89,10 +86,7 @@ {% do log("Uploading tests", true) %} {% set tests = dbt_artifacts.get_relation('tests') %} - {% set tests_set = [] %} - {% for node in graph.nodes.values() | selectattr("resource_type", "equalto", "test") %} - {% do tests_set.append(node) %} - {% endfor %} + {% set tests_set = graph.nodes.values() | selectattr("resource_type", "equalto", "test") | list %} {% set fields_tests = dbt_artifacts.get_column_name_list('tests') %} {# upload tests in chunks of 5000 tests (300 for BigQuery), or less #} {% set upload_limit = 300 if target.type == 'bigquery' else 5000 %} @@ -110,10 +104,7 @@ {% do log("Uploading models", true) %} {% set models = dbt_artifacts.get_relation('models') %} - {% set models_set = [] %} - {% for node in graph.nodes.values() | selectattr("resource_type", "equalto", "model") %} - {% do models_set.append(node) %} - {% endfor %} + {% set models_set = graph.nodes.values() | selectattr("resource_type", "equalto", "model") | list %} {% set fields_models = dbt_artifacts.get_column_name_list('models') %} {# upload tests in chunks of 100 models (50 for BigQuery), or less #} {% set upload_limit = 50 if target.type == 'bigquery' else 100 %} From 066ab2195809ea1f667863fc4468c760eeb235a3 Mon Sep 17 00:00:00 2001 From: Gemma Down Date: Wed, 13 Sep 2023 14:37:59 +0100 Subject: [PATCH 07/15] Generalise identifying objects to load further --- macros/upload_results/upload_results.sql | 36 ++++++++++++++++-------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/macros/upload_results/upload_results.sql b/macros/upload_results/upload_results.sql index 4fd81ef0..c6e860b1 100644 --- a/macros/upload_results/upload_results.sql +++ b/macros/upload_results/upload_results.sql @@ -16,27 +16,41 @@ {% do log("Uploading " ~ dataset.replace("_", ""), true) %} {% set dataset_relation = dbt_artifacts.get_relation(dataset) %} + {# Get the results that need to be uploaded #} + {% set objects %} + + {% if dataset in ['model_executions', 'seed_executions', 'test_executions', 'snapshot_executions'] %} + {{ results | selectattr("node.resource_type", "equalto", dataset.split("_")[0]) | list }} + {# + Use the [graph](https://docs.getdbt.com/reference/dbt-jinja-functions/graph) to extract details about + the exposures, seeds, snapshots and invocations + #} + {% elif dataset in ['seeds', 'snapshots'] %} + {{ graph.nodes.values() | selectattr("resource_type", "equalto", dataset[:-1]) }} + {% elif dataset == 'exposures' %} + {{ graph.exposures.values() | list }} + {% endif %} + + {% endset %} + + {# Convert the results to data to be imported #} {% set content %} {# Executions make use of the results object #} {% if dataset == 'model_executions' %} - {{ dbt_artifacts.upload_model_executions(results | selectattr("node.resource_type", "equalto", "model") | list) }} + {{ dbt_artifacts.upload_model_executions(objects) }} {% elif dataset == 'seed_executions' %} - {{ dbt_artifacts.upload_seed_executions(results | selectattr("node.resource_type", "equalto", "seed") | list) }} + {{ dbt_artifacts.upload_seed_executions(objects) }} {% elif dataset == 'test_executions' %} - {{ dbt_artifacts.upload_test_executions(results | selectattr("node.resource_type", "equalto", "test") | list) }} + {{ dbt_artifacts.upload_test_executions(objects) }} {% elif dataset == 'snapshot_executions' %} - {{ dbt_artifacts.upload_snapshot_executions(results | selectattr("node.resource_type", "equalto", "snapshot") | list) }} - {# - Use the [graph](https://docs.getdbt.com/reference/dbt-jinja-functions/graph) to extract details about - the exposures, seeds, snapshots and invocations - #} + {{ dbt_artifacts.upload_snapshot_executions(objects) }} {% elif dataset == 'exposures' %} - {{ dbt_artifacts.upload_exposures(graph.exposures.values() | list) }} + {{ dbt_artifacts.upload_exposures(objects) }} {% elif dataset == 'seeds' %} - {{ dbt_artifacts.upload_seeds(graph.nodes.values() | selectattr("resource_type", "equalto", "seed") | list) }} + {{ dbt_artifacts.upload_seeds(objects) }} {% elif dataset == 'snapshots' %} - {{ dbt_artifacts.upload_snapshots(graph.nodes.values() | selectattr("resource_type", "equalto", "snapshot") | list) }} + {{ dbt_artifacts.upload_snapshots(objects) }} {# Invocations only requires data from variables available in the macro #} {% elif dataset == 'invocations' %} {{ dbt_artifacts.upload_invocations() }} From 3b680b88d0fe0cf39862d116363adebd08444654 Mon Sep 17 00:00:00 2001 From: Gemma Down Date: Wed, 13 Sep 2023 14:57:07 +0100 Subject: [PATCH 08/15] Have all results included in the loop --- macros/upload_results/upload_results.sql | 196 +++++++++-------------- 1 file changed, 80 insertions(+), 116 deletions(-) diff --git a/macros/upload_results/upload_results.sql b/macros/upload_results/upload_results.sql index c6e860b1..dc8c116d 100644 --- a/macros/upload_results/upload_results.sql +++ b/macros/upload_results/upload_results.sql @@ -4,135 +4,99 @@ {% if execute %} - {% set standard_datasets = ['exposures', 'seeds', 'snapshots', 'invocations'] %} + {% set datasets_to_load = ['exposures', 'seeds', 'snapshots', 'invocations', 'sources', 'tests', 'models'] %} {% if results != [] %} {# When executing, and results are available, then upload the results #} - {% set standard_datasets = ['model_executions', 'seed_executions', 'test_executions', 'snapshot_executions'] + standard_datasets %} + {% set datasets_to_load = ['model_executions', 'seed_executions', 'test_executions', 'snapshot_executions'] + datasets_to_load %} {% endif %} {# Upload each data set in turn #} - {% for dataset in standard_datasets %} + {% for dataset in datasets_to_load %} - {% do log("Uploading " ~ dataset.replace("_", ""), true) %} + {% do log("Uploading " ~ dataset.replace("_", " "), true) %} + + {# Get the relation that the results will be uploaded to #} {% set dataset_relation = dbt_artifacts.get_relation(dataset) %} {# Get the results that need to be uploaded #} - {% set objects %} - {% if dataset in ['model_executions', 'seed_executions', 'test_executions', 'snapshot_executions'] %} - {{ results | selectattr("node.resource_type", "equalto", dataset.split("_")[0]) | list }} + {% if dataset in ['model_executions', 'seed_executions', 'test_executions', 'snapshot_executions'] %} + {# Executions make use of the results object #} + {% set objects = results | selectattr("node.resource_type", "equalto", dataset.split("_")[0]) | list %} + {% elif dataset in ['seeds', 'snapshots', 'tests', 'models'] %} + {# Use the nodes in the [graph](https://docs.getdbt.com/reference/dbt-jinja-functions/graph) to extract details #} + {% set objects = graph.nodes.values() | selectattr("resource_type", "equalto", dataset[:-1]) | list %} + {% elif dataset in ['exposures', 'sources'] %} + {# Use the [graph](https://docs.getdbt.com/reference/dbt-jinja-functions/graph) to extract details #} + {% set objects = graph.get(dataset).values() | list %} + {% elif dataset == 'invocations' %} {# - Use the [graph](https://docs.getdbt.com/reference/dbt-jinja-functions/graph) to extract details about - the exposures, seeds, snapshots and invocations + Invocations doesn't need anything input, but we include this so that it will still be picked up + as part of the loop below - the length must be >0 to allow for an upload, hence the empty string #} - {% elif dataset in ['seeds', 'snapshots'] %} - {{ graph.nodes.values() | selectattr("resource_type", "equalto", dataset[:-1]) }} - {% elif dataset == 'exposures' %} - {{ graph.exposures.values() | list }} - {% endif %} - - {% endset %} - - {# Convert the results to data to be imported #} - {% set content %} - - {# Executions make use of the results object #} - {% if dataset == 'model_executions' %} - {{ dbt_artifacts.upload_model_executions(objects) }} - {% elif dataset == 'seed_executions' %} - {{ dbt_artifacts.upload_seed_executions(objects) }} - {% elif dataset == 'test_executions' %} - {{ dbt_artifacts.upload_test_executions(objects) }} - {% elif dataset == 'snapshot_executions' %} - {{ dbt_artifacts.upload_snapshot_executions(objects) }} - {% elif dataset == 'exposures' %} - {{ dbt_artifacts.upload_exposures(objects) }} - {% elif dataset == 'seeds' %} - {{ dbt_artifacts.upload_seeds(objects) }} - {% elif dataset == 'snapshots' %} - {{ dbt_artifacts.upload_snapshots(objects) }} - {# Invocations only requires data from variables available in the macro #} - {% elif dataset == 'invocations' %} - {{ dbt_artifacts.upload_invocations() }} - {% endif %} - - {% endset %} - - {# Insert the content into the metadata table #} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=dataset_relation.database, - schema_name=dataset_relation.schema, - table_name=dataset_relation.identifier, - fields=dbt_artifacts.get_column_name_list(dataset), - content=content - ) - }} - + {% set objects = [''] %} + {% endif %} + + + {# Upload in chunks to reduce query size #} + {% if dataset == 'model' %} + {% set upload_limit = 50 if target.type == 'bigquery' else 100 %} + {% else %} + {% set upload_limit = 300 if target.type == 'bigquery' else 5000 %} + {% endif %} + + {# Loop through each chunk in turn #} + {% for i in range(0, objects | length, upload_limit) -%} + + {# Get just the objects to load on this loop #} + {% set objects_to_upload = objects[i: i + upload_limit] %} + + {# Convert the results to data to be imported #} + {% set content %} + + {% if dataset == 'model_executions' %} + {{ dbt_artifacts.upload_model_executions(objects_to_upload) }} + {% elif dataset == 'seed_executions' %} + {{ dbt_artifacts.upload_seed_executions(objects_to_upload) }} + {% elif dataset == 'test_executions' %} + {{ dbt_artifacts.upload_test_executions(objects_to_upload) }} + {% elif dataset == 'snapshot_executions' %} + {{ dbt_artifacts.upload_snapshot_executions(objects_to_upload) }} + {% elif dataset == 'exposures' %} + {{ dbt_artifacts.upload_exposures(objects_to_upload) }} + {% elif dataset == 'models' %} + {{ dbt_artifacts.upload_models(objects_to_upload) }} + {% elif dataset == 'seeds' %} + {{ dbt_artifacts.upload_seeds(objects_to_upload) }} + {% elif dataset == 'snapshots' %} + {{ dbt_artifacts.upload_snapshots(objects_to_upload) }} + {% elif dataset == 'sources' %} + {{ dbt_artifacts.upload_sources(objects_to_upload) }} + {% elif dataset == 'tests' %} + {{ dbt_artifacts.upload_tests(objects_to_upload) }} + {# Invocations only requires data from variables available in the macro #} + {% elif dataset == 'invocations' %} + {{ dbt_artifacts.upload_invocations() }} + {% endif %} + + {% endset %} + + {# Insert the content into the metadata table #} + {{ dbt_artifacts.insert_into_metadata_table( + database_name=dataset_relation.database, + schema_name=dataset_relation.schema, + table_name=dataset_relation.identifier, + fields=dbt_artifacts.get_column_name_list(dataset), + content=content + ) + }} + + {# Loop the next 'chunk' #} + {% endfor %} + + {# Loop the next 'dataset' #} {% endfor %} - - {# - We can also use a similar approach for sources, but we want to reduce the number uploaded each time - #} - - {% do log("Uploading sources", true) %} - {% set sources = dbt_artifacts.get_relation('sources') %} - {% set sources_set = graph.sources.values() | list %} - {% set fields_sources = dbt_artifacts.get_column_name_list('sources') %} - {# upload sources in chunks of 5000 sources (300 for BigQuery), or less #} - {% set upload_limit = 300 if target.type == 'bigquery' else 5000 %} - {% for i in range(0, sources_set | length, upload_limit) -%} - {% set content_sources = dbt_artifacts.upload_sources(sources_set[i: i + upload_limit]) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=sources.database, - schema_name=sources.schema, - table_name=sources.identifier, - fields=fields_sources, - content=content_sources - ) - }} - {%- endfor %} - - {# - Use the [graph](https://docs.getdbt.com/reference/dbt-jinja-functions/graph) to extract details about - the tests, models and sources - need to look through the nodes and select the ones we want - #} - - {% do log("Uploading tests", true) %} - {% set tests = dbt_artifacts.get_relation('tests') %} - {% set tests_set = graph.nodes.values() | selectattr("resource_type", "equalto", "test") | list %} - {% set fields_tests = dbt_artifacts.get_column_name_list('tests') %} - {# upload tests in chunks of 5000 tests (300 for BigQuery), or less #} - {% set upload_limit = 300 if target.type == 'bigquery' else 5000 %} - {% for i in range(0, tests_set | length, upload_limit) -%} - {% set content_tests = dbt_artifacts.upload_tests(tests_set[i: i + upload_limit]) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=tests.database, - schema_name=tests.schema, - table_name=tests.identifier, - fields=fields_tests, - content=content_tests - ) - }} - {%- endfor %} - - {% do log("Uploading models", true) %} - {% set models = dbt_artifacts.get_relation('models') %} - {% set models_set = graph.nodes.values() | selectattr("resource_type", "equalto", "model") | list %} - {% set fields_models = dbt_artifacts.get_column_name_list('models') %} - {# upload tests in chunks of 100 models (50 for BigQuery), or less #} - {% set upload_limit = 50 if target.type == 'bigquery' else 100 %} - {% for i in range(0, models_set | length, upload_limit) -%} - {% set content_models = dbt_artifacts.upload_models(models_set[i: i + upload_limit]) %} - {{ dbt_artifacts.insert_into_metadata_table( - database_name=models.database, - schema_name=models.schema, - table_name=models.identifier, - fields=fields_models, - content=content_models - ) - }} - {%- endfor %} - {% endif %} + {%- endmacro %} From 28cc067f15de791aaa341ec73c8f1fa3f6257f21 Mon Sep 17 00:00:00 2001 From: Gemma Down Date: Wed, 13 Sep 2023 15:00:59 +0100 Subject: [PATCH 09/15] Split out get_dataset_content --- macros/_macros.yml | 9 ++++++++ macros/upload_results/get_dataset_content.sql | 22 +++++++++++++++++++ macros/upload_results/upload_results.sql | 19 +--------------- 3 files changed, 32 insertions(+), 18 deletions(-) create mode 100644 macros/upload_results/get_dataset_content.sql diff --git a/macros/_macros.yml b/macros/_macros.yml index 6ef273aa..bc0280f5 100644 --- a/macros/_macros.yml +++ b/macros/_macros.yml @@ -187,6 +187,15 @@ macros: description: | The name of the dataset to return the column names for e.g. `models` + - name: get_dataset_content + description: | + A macro to extract the data to be uploaded from either the results or the graph object. + arguments: + - name: dataset + type: string + description: | + The name of the dataset to return the data for e.g. `models` + - name: insert_into_metadata_table description: | Dependent on the adapter type, the wrapper to insert the data into a table from a list of values. Used in the diff --git a/macros/upload_results/get_dataset_content.sql b/macros/upload_results/get_dataset_content.sql new file mode 100644 index 00000000..b34e39cd --- /dev/null +++ b/macros/upload_results/get_dataset_content.sql @@ -0,0 +1,22 @@ +{% macro get_dataset_content(dataset) %} + + {% if dataset in ['model_executions', 'seed_executions', 'test_executions', 'snapshot_executions'] %} + {# Executions make use of the results object #} + {% set objects = results | selectattr("node.resource_type", "equalto", dataset.split("_")[0]) | list %} + {% elif dataset in ['seeds', 'snapshots', 'tests', 'models'] %} + {# Use the nodes in the [graph](https://docs.getdbt.com/reference/dbt-jinja-functions/graph) to extract details #} + {% set objects = graph.nodes.values() | selectattr("resource_type", "equalto", dataset[:-1]) | list %} + {% elif dataset in ['exposures', 'sources'] %} + {# Use the [graph](https://docs.getdbt.com/reference/dbt-jinja-functions/graph) to extract details #} + {% set objects = graph.get(dataset).values() | list %} + {% elif dataset == 'invocations' %} + {# + Invocations doesn't need anything input, but we include this so that it will still be picked up + as part of the loop below - the length must be >0 to allow for an upload, hence the empty string + #} + {% set objects = [''] %} + {% endif %} + + {{ return(objects) }} + +{% endmacro %} diff --git a/macros/upload_results/upload_results.sql b/macros/upload_results/upload_results.sql index dc8c116d..0875aa3b 100644 --- a/macros/upload_results/upload_results.sql +++ b/macros/upload_results/upload_results.sql @@ -19,24 +19,7 @@ {% set dataset_relation = dbt_artifacts.get_relation(dataset) %} {# Get the results that need to be uploaded #} - - {% if dataset in ['model_executions', 'seed_executions', 'test_executions', 'snapshot_executions'] %} - {# Executions make use of the results object #} - {% set objects = results | selectattr("node.resource_type", "equalto", dataset.split("_")[0]) | list %} - {% elif dataset in ['seeds', 'snapshots', 'tests', 'models'] %} - {# Use the nodes in the [graph](https://docs.getdbt.com/reference/dbt-jinja-functions/graph) to extract details #} - {% set objects = graph.nodes.values() | selectattr("resource_type", "equalto", dataset[:-1]) | list %} - {% elif dataset in ['exposures', 'sources'] %} - {# Use the [graph](https://docs.getdbt.com/reference/dbt-jinja-functions/graph) to extract details #} - {% set objects = graph.get(dataset).values() | list %} - {% elif dataset == 'invocations' %} - {# - Invocations doesn't need anything input, but we include this so that it will still be picked up - as part of the loop below - the length must be >0 to allow for an upload, hence the empty string - #} - {% set objects = [''] %} - {% endif %} - + {% set objects = dbt_artifacts.get_dataset_content() %} {# Upload in chunks to reduce query size #} {% if dataset == 'model' %} From 7b3770ebbc4162ca215af82dab1e6a9c43c75b2e Mon Sep 17 00:00:00 2001 From: Gemma Down Date: Wed, 13 Sep 2023 15:07:25 +0100 Subject: [PATCH 10/15] Move the get table content into its own macro --- macros/_macros.yml | 13 ++++++++ .../get_table_content_values.sql | 32 +++++++++++++++++++ macros/upload_results/upload_results.sql | 32 +------------------ 3 files changed, 46 insertions(+), 31 deletions(-) create mode 100644 macros/upload_results/get_table_content_values.sql diff --git a/macros/_macros.yml b/macros/_macros.yml index bc0280f5..7b798447 100644 --- a/macros/_macros.yml +++ b/macros/_macros.yml @@ -196,6 +196,19 @@ macros: description: | The name of the dataset to return the data for e.g. `models` + - name: get_table_content_values + description: | + A macro to create the insert statement values required to be uploaded to the table. + arguments: + - name: dataset + type: string + description: | + The name of the dataset to return the column names for e.g. `models` + - name: objects_to_upload + type: list + description: | + The objects to be used to generate the insert statement values - extracted from `get_dataset_content` + - name: insert_into_metadata_table description: | Dependent on the adapter type, the wrapper to insert the data into a table from a list of values. Used in the diff --git a/macros/upload_results/get_table_content_values.sql b/macros/upload_results/get_table_content_values.sql new file mode 100644 index 00000000..277899c5 --- /dev/null +++ b/macros/upload_results/get_table_content_values.sql @@ -0,0 +1,32 @@ +{% macro get_table_content_values(dataset, objects_to_upload) %} + + {# Convert the results to data to be imported #} + + {% if dataset == 'model_executions' %} + {% set content = dbt_artifacts.upload_model_executions(objects_to_upload) %} + {% elif dataset == 'seed_executions' %} + {% set content = dbt_artifacts.upload_seed_executions(objects_to_upload) %} + {% elif dataset == 'test_executions' %} + {% set content = dbt_artifacts.upload_test_executions(objects_to_upload) %} + {% elif dataset == 'snapshot_executions' %} + {% set content = dbt_artifacts.upload_snapshot_executions(objects_to_upload) %} + {% elif dataset == 'exposures' %} + {% set content = dbt_artifacts.upload_exposures(objects_to_upload) %} + {% elif dataset == 'models' %} + {% set content = dbt_artifacts.upload_models(objects_to_upload) %} + {% elif dataset == 'seeds' %} + {% set content = dbt_artifacts.upload_seeds(objects_to_upload) %} + {% elif dataset == 'snapshots' %} + {% set content = dbt_artifacts.upload_snapshots(objects_to_upload) %} + {% elif dataset == 'sources' %} + {% set content = dbt_artifacts.upload_sources(objects_to_upload) %} + {% elif dataset == 'tests' %} + {% set content = dbt_artifacts.upload_tests(objects_to_upload) %} + {# Invocations only requires data from variables available in the macro #} + {% elif dataset == 'invocations' %} + {% set content = dbt_artifacts.upload_invocations() %} + {% endif %} + + {{ return(content) }} + +{% endmacro %} diff --git a/macros/upload_results/upload_results.sql b/macros/upload_results/upload_results.sql index 0875aa3b..0c56776a 100644 --- a/macros/upload_results/upload_results.sql +++ b/macros/upload_results/upload_results.sql @@ -32,37 +32,7 @@ {% for i in range(0, objects | length, upload_limit) -%} {# Get just the objects to load on this loop #} - {% set objects_to_upload = objects[i: i + upload_limit] %} - - {# Convert the results to data to be imported #} - {% set content %} - - {% if dataset == 'model_executions' %} - {{ dbt_artifacts.upload_model_executions(objects_to_upload) }} - {% elif dataset == 'seed_executions' %} - {{ dbt_artifacts.upload_seed_executions(objects_to_upload) }} - {% elif dataset == 'test_executions' %} - {{ dbt_artifacts.upload_test_executions(objects_to_upload) }} - {% elif dataset == 'snapshot_executions' %} - {{ dbt_artifacts.upload_snapshot_executions(objects_to_upload) }} - {% elif dataset == 'exposures' %} - {{ dbt_artifacts.upload_exposures(objects_to_upload) }} - {% elif dataset == 'models' %} - {{ dbt_artifacts.upload_models(objects_to_upload) }} - {% elif dataset == 'seeds' %} - {{ dbt_artifacts.upload_seeds(objects_to_upload) }} - {% elif dataset == 'snapshots' %} - {{ dbt_artifacts.upload_snapshots(objects_to_upload) }} - {% elif dataset == 'sources' %} - {{ dbt_artifacts.upload_sources(objects_to_upload) }} - {% elif dataset == 'tests' %} - {{ dbt_artifacts.upload_tests(objects_to_upload) }} - {# Invocations only requires data from variables available in the macro #} - {% elif dataset == 'invocations' %} - {{ dbt_artifacts.upload_invocations() }} - {% endif %} - - {% endset %} + {% set content = get_table_content_values(dataset, objects[i: i + upload_limit]) %} {# Insert the content into the metadata table #} {{ dbt_artifacts.insert_into_metadata_table( From c0e3832e8508ac7e30f74a9d663a58cfb6ac438d Mon Sep 17 00:00:00 2001 From: Gemma Down Date: Wed, 13 Sep 2023 15:25:24 +0100 Subject: [PATCH 11/15] Bug fixing --- macros/upload_results/upload_results.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/macros/upload_results/upload_results.sql b/macros/upload_results/upload_results.sql index 0c56776a..5f4df915 100644 --- a/macros/upload_results/upload_results.sql +++ b/macros/upload_results/upload_results.sql @@ -19,7 +19,7 @@ {% set dataset_relation = dbt_artifacts.get_relation(dataset) %} {# Get the results that need to be uploaded #} - {% set objects = dbt_artifacts.get_dataset_content() %} + {% set objects = dbt_artifacts.get_dataset_content(dataset) %} {# Upload in chunks to reduce query size #} {% if dataset == 'model' %} @@ -32,7 +32,7 @@ {% for i in range(0, objects | length, upload_limit) -%} {# Get just the objects to load on this loop #} - {% set content = get_table_content_values(dataset, objects[i: i + upload_limit]) %} + {% set content = dbt_artifacts.get_table_content_values(dataset, objects[i: i + upload_limit]) %} {# Insert the content into the metadata table #} {{ dbt_artifacts.insert_into_metadata_table( From d368961bf2bd9937fd56acf711a5ff693eb08cf0 Mon Sep 17 00:00:00 2001 From: Gemma Down Date: Thu, 14 Sep 2023 12:26:02 +0100 Subject: [PATCH 12/15] Move relation definition to insert macro to avoid quoting issues --- .../insert_into_metadata_table.sql | 23 +++++++++++-------- macros/upload_results/upload_results.sql | 7 +----- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/macros/upload_results/insert_into_metadata_table.sql b/macros/upload_results/insert_into_metadata_table.sql index 37994249..2ec4d5d5 100644 --- a/macros/upload_results/insert_into_metadata_table.sql +++ b/macros/upload_results/insert_into_metadata_table.sql @@ -1,15 +1,20 @@ -{% macro insert_into_metadata_table(database_name, schema_name, table_name, fields, content) -%} +{% macro insert_into_metadata_table(dataset, fields, content) -%} {% if content != "" %} - {{ return(adapter.dispatch('insert_into_metadata_table', 'dbt_artifacts')(database_name, schema_name, table_name, fields, content)) }} + + {# Get the relation that the results will be uploaded to #} + {% set dataset_relation = dbt_artifacts.get_relation(dataset) %} + {# Insert the data into the table #} + {{ return(adapter.dispatch('insert_into_metadata_table', 'dbt_artifacts')(dataset_relation, fields, content)) }} + {% endif %} {%- endmacro %} -{% macro spark__insert_into_metadata_table(database_name, schema_name, table_name, fields, content) -%} +{% macro spark__insert_into_metadata_table(relation, fields, content) -%} {% set insert_into_table_query %} - insert into {% if database_name %}{{ database_name }}.{% endif %}{{ schema_name }}.{{ table_name }} {{ fields }} + insert into {{ relation }} {{ fields }} {{ content }} {% endset %} @@ -17,10 +22,10 @@ {%- endmacro %} -{% macro snowflake__insert_into_metadata_table(database_name, schema_name, table_name, fields, content) -%} +{% macro snowflake__insert_into_metadata_table(relation, fields, content) -%} {% set insert_into_table_query %} - insert into {{database_name}}.{{ schema_name }}.{{ table_name }} {{ fields }} + insert into {{ relation }} {{ fields }} {{ content }} {% endset %} @@ -28,10 +33,10 @@ {%- endmacro %} -{% macro bigquery__insert_into_metadata_table(database_name, schema_name, table_name, fields, content) -%} +{% macro bigquery__insert_into_metadata_table(relation, fields, content) -%} {% set insert_into_table_query %} - insert into `{{database_name}}.{{ schema_name }}.{{ table_name }}` {{ fields }} + insert into {{ relation }} {{ fields }} values {{ content }} {% endset %} @@ -40,5 +45,5 @@ {%- endmacro %} -{% macro default__insert_into_metadata_table(database_name, schema_name, table_name, fields, content) -%} +{% macro default__insert_into_metadata_table(relation, fields, content) -%} {%- endmacro %} diff --git a/macros/upload_results/upload_results.sql b/macros/upload_results/upload_results.sql index 5f4df915..fcadc199 100644 --- a/macros/upload_results/upload_results.sql +++ b/macros/upload_results/upload_results.sql @@ -15,9 +15,6 @@ {% do log("Uploading " ~ dataset.replace("_", " "), true) %} - {# Get the relation that the results will be uploaded to #} - {% set dataset_relation = dbt_artifacts.get_relation(dataset) %} - {# Get the results that need to be uploaded #} {% set objects = dbt_artifacts.get_dataset_content(dataset) %} @@ -36,9 +33,7 @@ {# Insert the content into the metadata table #} {{ dbt_artifacts.insert_into_metadata_table( - database_name=dataset_relation.database, - schema_name=dataset_relation.schema, - table_name=dataset_relation.identifier, + dataset=dataset, fields=dbt_artifacts.get_column_name_list(dataset), content=content ) From 54aa3fd8afc55537f337eaf12224c458a6697306 Mon Sep 17 00:00:00 2001 From: Gemma Down Date: Thu, 14 Sep 2023 12:59:36 +0100 Subject: [PATCH 13/15] Revert "Remove unused `rows_affected` column in `test_executions` #195" This reverts commit 9182fdcb281362d04831371636757846cef2c577. --- macros/upload_individual_datasets/upload_test_executions.sql | 5 ++++- macros/upload_results/get_column_name_lists.sql | 1 + models/fct_dbt__test_executions.sql | 1 + models/fct_dbt__test_executions.yml | 2 ++ models/sources/test_executions.sql | 1 + models/sources/test_executions.yml | 2 ++ models/staging/stg_dbt__test_executions.sql | 1 + models/staging/stg_dbt__test_executions.yml | 2 ++ 8 files changed, 14 insertions(+), 1 deletion(-) diff --git a/macros/upload_individual_datasets/upload_test_executions.sql b/macros/upload_individual_datasets/upload_test_executions.sql index 84ef217b..60986658 100644 --- a/macros/upload_individual_datasets/upload_test_executions.sql +++ b/macros/upload_individual_datasets/upload_test_executions.sql @@ -17,7 +17,8 @@ {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(9) }}, {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(10) }}, {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(11) }}, - {{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(12)) }} + {{ adapter.dispatch('column_identifier', 'dbt_artifacts')(12) }}, + {{ adapter.dispatch('parse_json', 'dbt_artifacts')(adapter.dispatch('column_identifier', 'dbt_artifacts')(13)) }} from values {% for test in tests -%} ( @@ -56,6 +57,7 @@ {% endif %} {{ test.execution_time }}, {# total_node_runtime #} + null, {# rows_affected not available in Databricks #} {{ 'null' if test.failures is none else test.failures }}, {# failures #} '{{ test.message | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}', {# message #} '{{ tojson(test.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') }}' {# adapter_response #} @@ -109,6 +111,7 @@ {% endif %} {{ test.execution_time }}, {# total_node_runtime #} + null, {# rows_affected not available in Databricks #} {{ 'null' if test.failures is none else test.failures }}, {# failures #} '{{ test.message | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"') | replace("\n", "\\n") }}', {# message #} {{ adapter.dispatch('parse_json', 'dbt_artifacts')(tojson(test.adapter_response) | replace("\\", "\\\\") | replace("'", "\\'") | replace('"', '\\"')) }} {# adapter_response #} diff --git a/macros/upload_results/get_column_name_lists.sql b/macros/upload_results/get_column_name_lists.sql index eb107e33..050974cb 100644 --- a/macros/upload_results/get_column_name_lists.sql +++ b/macros/upload_results/get_column_name_lists.sql @@ -211,6 +211,7 @@ compile_started_at, query_completed_at, total_node_runtime, + rows_affected, failures, message, adapter_response diff --git a/models/fct_dbt__test_executions.sql b/models/fct_dbt__test_executions.sql index 5e227af5..b921df43 100644 --- a/models/fct_dbt__test_executions.sql +++ b/models/fct_dbt__test_executions.sql @@ -18,6 +18,7 @@ test_executions as ( compile_started_at, query_completed_at, total_node_runtime, + rows_affected, failures, message from base diff --git a/models/fct_dbt__test_executions.yml b/models/fct_dbt__test_executions.yml index 6541a7ff..35cae002 100644 --- a/models/fct_dbt__test_executions.yml +++ b/models/fct_dbt__test_executions.yml @@ -14,6 +14,8 @@ models: description: '{{ doc("node_id") }}' - name: query_completed_at description: '{{ doc("query_completed_at") }}' + - name: rows_affected + description: '{{ doc("rows_affected") }}' - name: status description: '{{ doc("status") }}' - name: test_execution_id diff --git a/models/sources/test_executions.sql b/models/sources/test_executions.sql index 4f0e19f7..50d37980 100644 --- a/models/sources/test_executions.sql +++ b/models/sources/test_executions.sql @@ -13,6 +13,7 @@ select cast(null as {{ type_timestamp() }}) as compile_started_at, cast(null as {{ type_timestamp() }}) as query_completed_at, cast(null as {{ type_float() }}) as total_node_runtime, + cast(null as {{ type_int() }}) as rows_affected, cast(null as {{ type_int() }}) as failures, cast(null as {{ type_string() }}) as message, cast(null as {{ type_json() }}) as adapter_response diff --git a/models/sources/test_executions.yml b/models/sources/test_executions.yml index 4449a37d..e851c7c5 100644 --- a/models/sources/test_executions.yml +++ b/models/sources/test_executions.yml @@ -22,6 +22,8 @@ models: description: '{{ doc("query_completed_at") }}' - name: total_node_runtime description: '{{ doc("total_node_runtime") }}' + - name: rows_affected + description: '{{ doc("rows_affected") }}' - name: failures description: '{{ doc("failures") }}' - name: message diff --git a/models/staging/stg_dbt__test_executions.sql b/models/staging/stg_dbt__test_executions.sql index 9ace3de5..ca7ea868 100644 --- a/models/staging/stg_dbt__test_executions.sql +++ b/models/staging/stg_dbt__test_executions.sql @@ -18,6 +18,7 @@ enhanced as ( compile_started_at, query_completed_at, total_node_runtime, + rows_affected, failures, message from base diff --git a/models/staging/stg_dbt__test_executions.yml b/models/staging/stg_dbt__test_executions.yml index aab7cfd2..93d62fe4 100644 --- a/models/staging/stg_dbt__test_executions.yml +++ b/models/staging/stg_dbt__test_executions.yml @@ -14,6 +14,8 @@ models: description: '{{ doc("node_id") }}' - name: query_completed_at description: '{{ doc("query_completed_at") }}' + - name: rows_affected + description: '{{ doc("rows_affected") }}' - name: run_started_at description: '{{ doc("run_started_at") }}' - name: status From 642842dc3d176e0f165c123e248b2d106a6224d6 Mon Sep 17 00:00:00 2001 From: Gemma Down Date: Thu, 14 Sep 2023 13:00:12 +0100 Subject: [PATCH 14/15] Remove ---debug left in databricks tox --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 2070a7eb..fbff0176 100644 --- a/tox.ini +++ b/tox.ini @@ -166,7 +166,7 @@ deps = dbt-databricks~=1.6.0 commands = dbt clean dbt deps - dbt --debug build --target databricks + dbt build --target databricks [testenv:integration_databricks_1_3_0] changedir = integration_test_project From 18e371a664724cfe3bb6db823f09982848f7aa7b Mon Sep 17 00:00:00 2001 From: Gemma Down <52132406+glsdown@users.noreply.github.com> Date: Mon, 18 Sep 2023 14:18:07 +0100 Subject: [PATCH 15/15] Apply suggestions from code review Co-authored-by: Jared Rimmer <100997264+jared-rimmer@users.noreply.github.com> --- macros/database_specific_helpers/get_relation.sql | 4 ++-- macros/upload_results/get_column_name_lists.sql | 11 ----------- 2 files changed, 2 insertions(+), 13 deletions(-) diff --git a/macros/database_specific_helpers/get_relation.sql b/macros/database_specific_helpers/get_relation.sql index 5954032d..ed7d02e5 100644 --- a/macros/database_specific_helpers/get_relation.sql +++ b/macros/database_specific_helpers/get_relation.sql @@ -1,6 +1,6 @@ -{% macro get_relation(get_relation_name) %} +{% macro get_relation(relation_name) %} {% if execute %} - {% set model_get_relation_node = graph.nodes.values() | selectattr('name', 'equalto', get_relation_name) | first %} + {% set model_get_relation_node = graph.nodes.values() | selectattr('name', 'equalto', relation_name) | first %} {% set relation = api.Relation.create( database = model_get_relation_node.database, schema = model_get_relation_node.schema, diff --git a/macros/upload_results/get_column_name_lists.sql b/macros/upload_results/get_column_name_lists.sql index 050974cb..ea708467 100644 --- a/macros/upload_results/get_column_name_lists.sql +++ b/macros/upload_results/get_column_name_lists.sql @@ -6,7 +6,6 @@ {% macro get_column_name_list(dataset) -%} - {# Exposures #} {% if dataset == 'exposures' %} ( @@ -26,7 +25,6 @@ all_results ) - {# Invocations #} {% elif dataset == 'invocations' %} ( @@ -51,7 +49,6 @@ dbt_custom_envs ) - {# Model Executions #} {% elif dataset == 'model_executions' %} ( @@ -76,7 +73,6 @@ adapter_response ) - {# Models #} {% elif dataset == 'models' %} ( @@ -98,7 +94,6 @@ ) - {# Seed Executions #} {% elif dataset == 'seed_executions' %} ( @@ -120,7 +115,6 @@ adapter_response ) - {# Seeds #} {% elif dataset == 'seeds' %} ( @@ -138,7 +132,6 @@ all_results ) - {# Snapshot Executions #} {% elif dataset == 'snapshot_executions' %} ( @@ -160,7 +153,6 @@ adapter_response ) - {# Snapshots #} {% elif dataset == 'snapshots' %} ( @@ -180,7 +172,6 @@ all_results ) - {# Sources #} {% elif dataset == 'sources' %} ( @@ -198,7 +189,6 @@ all_results ) - {# Test Executions #} {% elif dataset == 'test_executions' %} ( @@ -217,7 +207,6 @@ adapter_response ) - {# Tests #} {% elif dataset == 'tests' %} (