From f4351142ae73c98ce690816b462adba37c382d2c Mon Sep 17 00:00:00 2001 From: Alan Cruickshank Date: Tue, 22 Feb 2022 09:18:55 +0000 Subject: [PATCH] Flatten artifacts on load (#84) * Add V2 artifact upload method which flattens on load, avoiding 16MB field limit --- README.md | 80 ++++++++---- .../macros/drop_test_schema.sql | 11 ++ .../tests/check_model_executions.sql | 37 ++++++ .../tests/check_node_executions.sql | 37 ++++++ .../tests/check_run_results.sql | 25 ++++ macros/artifact_run.sql | 4 + macros/create_artifact_resources.sql | 96 +++++++++++++- macros/flatten_manifest.sql | 85 ++++++++++++ macros/flatten_results.sql | 32 +++++ macros/upload_artifacts_v2.sql | 122 ++++++++++++++++++ models/staging/sources.yml | 61 ++++++++- models/staging/stg_dbt__artifacts.sql | 3 +- models/staging/stg_dbt__node_executions.sql | 41 +++--- models/staging/stg_dbt__nodes.sql | 85 ++---------- models/staging/stg_dbt__run_results.sql | 33 +++-- .../staging/stg_dbt__run_results_env_keys.sql | 18 +-- tox.ini | 17 +++ 17 files changed, 635 insertions(+), 152 deletions(-) create mode 100644 integration_test_project/macros/drop_test_schema.sql create mode 100644 integration_test_project/tests/check_model_executions.sql create mode 100644 integration_test_project/tests/check_node_executions.sql create mode 100644 integration_test_project/tests/check_run_results.sql create mode 100644 macros/artifact_run.sql create mode 100644 macros/flatten_manifest.sql create mode 100644 macros/flatten_results.sql create mode 100644 macros/upload_artifacts_v2.sql diff --git a/README.md b/README.md index 38eadcd1..28c2ef5a 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,11 @@ vars: dbt_artifacts_database: your_db # optional, default is your target database dbt_artifacts_schema: your_schema # optional, default is 'dbt_artifacts' dbt_artifacts_table: your_table # optional, default is 'artifacts' + dbt_artifacts_results_table: your_table # optional, default is 'dbt_run_results' + dbt_artifacts_result_nodes_table: your_table # optional, default is 'dbt_run_result_nodes' + dbt_artifacts_manifest_nodes_table: your_table # optional, default is 'dbt_run_manifest_nodes' + dbt_artifacts_manifest_sources_table: your_table # optional, default is 'dbt_run_manifest_sources' + dbt_artifacts_manifest_exposures_table: your_table # optional, default is 'dbt_run_manifest_exposures' models: ... @@ -45,33 +50,58 @@ Note that the model materializations are defined in this package's `dbt_project. 3. Run `dbt deps`. -## Generating the source table -This package requires that the source data exists in a table in Snowflake. - -### Option 1: Loading local files +## Uploading the artifacts +This package uploads the artifact files into Snowflake. There are two supported ways of doing this: +- The _V2_ way of doing this which flattens the uploaded files on load. This supports files over + 16MB (the limit of a variant field in snowflake) and also makes rebuilds of the materialised + models much faster because the JSON unpacking is done once on load. The downside of this approach + is that the upload is much heavier and more complex, as such we only directly support the + _"local file"_ method. Loading via cloud storage is also _possible_ but we recommend users + copy the method used in `upload_artifacts_v2.sql` to create their own approach. +- The _V1_ or _legacy_ option, which uploads the files unprocessed. This affords much more flexibility + in their use, but is subject to field size limits and higher compute loads to reprocess the + large JSON payloads in future. This may be appropriate for more custom setups or for small projects + but for large projects which aren't extending the functionality of the package significantly, we + recommend the _V2_ method. + +### Option 1: Loading local files [V1 & V2] Snowflake makes it possible to load local files into your warehouse. We've included a number of macros to assist with this. This method can be used by both dbt Cloud users, and users of other orchestration tools. -1. To initially create these tables, execute `dbt run-operation create_artifact_resources` ([source](macros/create_artifact_resources.sql)). This will create a stage and a table named `{{ target.database }}.dbt_artifacts.artifacts` — you can override this name using the variables listed in the Installation section, above. - -2. Add [operations](https://docs.getdbt.com/docs/building-a-dbt-project/hooks-operations/#operations) to your production run to load files into your table, via the `upload_artifacts` macro ([source](macros/upload_artifacts.sql)). You'll need to specify which files to upload through use of the `--args` flag. Here's an example setup. -```txt -$ dbt seed -$ dbt run-operation upload_dbt_artifacts --args '{filenames: [manifest, run_results]}' - -$ dbt run -$ dbt run-operation upload_dbt_artifacts --args '{filenames: [manifest, run_results]}' - -$ dbt test -$ dbt run-operation upload_dbt_artifacts --args '{filenames: [run_results]}' - -$ dbt source snapshot-freshness -$ dbt run-operation upload_dbt_artifacts --args '{filenames: [sources]}' - -$ dbt docs generate -$ dbt run-operation upload_dbt_artifacts --args '{filenames: [catalog]}' -``` - -### Option 2: Loading cloud-storage files +1. To initially create these tables, execute `dbt run-operation create_artifact_resources` + ([source](macros/create_artifact_resources.sql)). This will create a stage and a set of tables in + the `{{ target.database }}.dbt_artifacts` schema — you can override the database, schema and table + names using the variables listed in the Installation section, above. + +2. Add [operations](https://docs.getdbt.com/docs/building-a-dbt-project/hooks-operations/#operations) + to your production run to load files into your table. + + **V2 Macro**: Use the `upload_dbt_artifacts_v2` macro ([source](macros/upload_artifacts.sql)). You only + need to run the macro after `run`, `test`, `seed`, `snapshot` or `build` operations. + ```txt + $ dbt run + $ dbt run-operation upload_dbt_artifacts_v2 + ``` + + **V1 Macro**: Use the `upload_dbt_artifacts` macro ([source](macros/upload_artifacts.sql)). You'll need + to specify which files to upload through use of the `--args` flag. Here's an example setup. + ```txt + $ dbt seed + $ dbt run-operation upload_dbt_artifacts --args '{filenames: [manifest, run_results]}' + + $ dbt run + $ dbt run-operation upload_dbt_artifacts --args '{filenames: [manifest, run_results]}' + + $ dbt test + $ dbt run-operation upload_dbt_artifacts --args '{filenames: [run_results]}' + + $ dbt source snapshot-freshness + $ dbt run-operation upload_dbt_artifacts --args '{filenames: [sources]}' + + $ dbt docs generate + $ dbt run-operation upload_dbt_artifacts --args '{filenames: [catalog]}' + ``` + +### Option 2: Loading cloud-storage files [V1 only] If you are using an orchestrator, you might instead upload these files to cloud storage — the method to do this will depend on your orchestrator. Then, link the cloud storage destination to a Snowflake external stage, and use a snowpipe to copy these files into the source table: diff --git a/integration_test_project/macros/drop_test_schema.sql b/integration_test_project/macros/drop_test_schema.sql new file mode 100644 index 00000000..034b68ef --- /dev/null +++ b/integration_test_project/macros/drop_test_schema.sql @@ -0,0 +1,11 @@ +{% macro drop_test_schema() %} + +-- We drop if exists so that it still passes when the db is clean. +{% set drop_schema_query %} + drop schema if exists {{ target.schema }}; +{% endset %} + +{% do log("Dropping test schema: " ~ drop_schema_query, info=True) %} +{% do run_query(drop_schema_query) %} + +{% endmacro %} diff --git a/integration_test_project/tests/check_model_executions.sql b/integration_test_project/tests/check_model_executions.sql new file mode 100644 index 00000000..d2e5d421 --- /dev/null +++ b/integration_test_project/tests/check_model_executions.sql @@ -0,0 +1,37 @@ +with raw_model_executions as ( + + select * from {{ ref('fct_dbt__model_executions') }} + +), + +grouped_executions as ( + + select + artifact_run_id, + count(*) as runs + from raw_model_executions + group by artifact_run_id + +), + +expected_results as ( + + select + artifact_run_id, + runs, + -- Hard coded expected results. Potentially to improve later. + case artifact_run_id + when 'b27910c784063dc867a762eb91ac7e93033492ac49b482215cd1761824b07a58' then 31 -- build + when '1ab40ec436539434416dfca0bb0e8d8cf3708bb568fb2385321a192b59b9c4e7' then 31 -- build_full_refresh + when 'c6775fc1f3d39acb37f389df8b67aa59cb989994dc9b940b51e7bcba830212a3' then 31 -- run + when '4fbd1feb6cfc3cd088fc47ac461efdfab7f95380aa5a939360da629bbdb9ce1d' then 31 -- run_full_refresh + when '6ee8780f7533ae3901f8759fd07ddae4af20b7856c788bf515bdf14ee059e90d' then 0 -- seed + when '1c87fbb828af7f041f0d7d4440904a8e482a8be74e617eb57a11b76001936550' then 0 -- snapshot + when '37f4a0fca17b0f8f1fb0db04fbef311dd73cacfcd6653c76d46e3d7f36dc079c' then 0 -- test + else 0 + end as expected_runs + from grouped_executions + where runs != expected_runs +) + +select * from expected_results diff --git a/integration_test_project/tests/check_node_executions.sql b/integration_test_project/tests/check_node_executions.sql new file mode 100644 index 00000000..f1cadcf9 --- /dev/null +++ b/integration_test_project/tests/check_node_executions.sql @@ -0,0 +1,37 @@ +with raw_node_executions as ( + + select * from {{ ref('stg_dbt__node_executions') }} + +), + +grouped_executions as ( + + select + artifact_run_id, + count(*) as runs + from raw_node_executions + group by artifact_run_id + +), + +expected_results as ( + + select + artifact_run_id, + runs, + -- Hard coded expected results. Potentially to improve later. + case artifact_run_id + when 'b27910c784063dc867a762eb91ac7e93033492ac49b482215cd1761824b07a58' then 51 -- build + when '1ab40ec436539434416dfca0bb0e8d8cf3708bb568fb2385321a192b59b9c4e7' then 51 -- build_full_refresh + when 'c6775fc1f3d39acb37f389df8b67aa59cb989994dc9b940b51e7bcba830212a3' then 31 -- run + when '4fbd1feb6cfc3cd088fc47ac461efdfab7f95380aa5a939360da629bbdb9ce1d' then 31 -- run_full_refresh + when '6ee8780f7533ae3901f8759fd07ddae4af20b7856c788bf515bdf14ee059e90d' then 1 -- seed + when '1c87fbb828af7f041f0d7d4440904a8e482a8be74e617eb57a11b76001936550' then 1 -- snapshot + when '37f4a0fca17b0f8f1fb0db04fbef311dd73cacfcd6653c76d46e3d7f36dc079c' then 18 -- test + else 0 + end as expected_runs + from grouped_executions + where runs != expected_runs +) + +select * from expected_results diff --git a/integration_test_project/tests/check_run_results.sql b/integration_test_project/tests/check_run_results.sql new file mode 100644 index 00000000..8d043296 --- /dev/null +++ b/integration_test_project/tests/check_run_results.sql @@ -0,0 +1,25 @@ +with raw_runs as ( + + select * from {{ ref('stg_dbt__run_results') }} + +), + +grouped_runs as ( + + select + count(*) as runs + from raw_runs + +), + +expected_results as ( + + select + runs, + -- Hard coded expected results. Potentially to improve later. + 7 as expected_runs + from grouped_runs + where runs != expected_runs +) + +select * from expected_results diff --git a/macros/artifact_run.sql b/macros/artifact_run.sql new file mode 100644 index 00000000..e43dc013 --- /dev/null +++ b/macros/artifact_run.sql @@ -0,0 +1,4 @@ +-- This ID provides a reliable ID, regardless of whether running in a local or cloud environment. +{% macro make_artifact_run_id() %} + sha2_hex(coalesce(dbt_cloud_run_id::string, command_invocation_id::string), 256) +{% endmacro %} diff --git a/macros/create_artifact_resources.sql b/macros/create_artifact_resources.sql index c6a431b6..07ab15b4 100644 --- a/macros/create_artifact_resources.sql +++ b/macros/create_artifact_resources.sql @@ -1,29 +1,113 @@ {% macro create_artifact_resources() %} {% set src_dbt_artifacts = source('dbt_artifacts', 'artifacts') %} +{% set artifact_stage = var('dbt_artifacts_stage', 'dbt_artifacts_stage') %} + +{% set src_results = source('dbt_artifacts', 'dbt_run_results') %} +{% set src_results_nodes = source('dbt_artifacts', 'dbt_run_results_nodes') %} +{% set src_manifest_nodes = source('dbt_artifacts', 'dbt_run_manifest_nodes') %} {{ create_schema(src_dbt_artifacts) }} -{% set create_stage_query %} +{% set create_v1_stage_query %} create stage if not exists {{ src_dbt_artifacts }} file_format = (type = json); {% endset %} -{% set create_table_query %} +{% set create_v2_stage_query %} +create stage if not exists {{ artifact_stage }} +file_format = (type = json); +{% endset %} + +{% set create_v1_table_query %} create table if not exists {{ src_dbt_artifacts }} ( data variant, generated_at timestamp, path string, artifact_type string ); +{% endset %} +{% set create_v2_results_query %} +create table if not exists {{ src_results }} ( + command_invocation_id string, + dbt_cloud_run_id int, + artifact_run_id string, + artifact_generated_at timestamp_tz, + dbt_version string, + env variant, + elapsed_time double, + execution_command string, + was_full_refresh boolean, + selected_models variant, + target string, + metadata variant, + args variant +); {% endset %} +{% set create_v2_result_nodes_table_query %} +create table if not exists {{ src_results_nodes }} ( + command_invocation_id string, + dbt_cloud_run_id int, + artifact_run_id string, + artifact_generated_at timestamp_tz, + execution_command string, + was_full_refresh boolean, + node_id string, + thread_id integer, + status string, + message string, + compile_started_at timestamp_tz, + query_completed_at timestamp_tz, + total_node_runtime float, + rows_affected int, + result_json variant +); +{% endset %} + +{% set create_v2_manifest_nodes_table_query %} +create table if not exists {{ src_manifest_nodes }} ( + command_invocation_id string, + dbt_cloud_run_id int, + artifact_run_id string, + artifact_generated_at timestamp_tz, + node_id string, + resource_type string, + node_database string, + node_schema string, + name string, + depends_on_nodes array, + depends_on_sources array, + exposure_type string, + exposure_owner string, + exposure_maturity string, + source_name string, + package_name string, + relation_name string, + node_path string, + checksum string, + materialization string, + node_json variant +); +{% endset %} + +{% do log("Creating V1 Stage: " ~ create_v1_stage_query, info=True) %} +{% do run_query(create_v1_stage_query) %} + +{% do log("Creating V2 Stage: " ~ create_v2_stage_query, info=True) %} +{% do run_query(create_v2_stage_query) %} + +{% do log("Creating V1 Table: " ~ create_v1_table_query, info=True) %} +{% do run_query(create_v1_table_query) %} + +{% do log("Creating V2 Results Table: " ~ create_v2_results_query, info=True) %} +{% do run_query(create_v2_results_query) %} -{% do log("Creating Stage: " ~ create_stage_query, info=True) %} -{% do run_query(create_stage_query) %} +{% do log("Creating V2 Result Nodes Table: " ~ create_v2_result_nodes_table_query, info=True) %} +{% do run_query(create_v2_result_nodes_table_query) %} -{% do log("Creating Table: " ~ create_table_query, info=True) %} -{% do run_query(create_table_query) %} +{% do log("Creating V2 Manifest Nodes Table: " ~ create_v2_manifest_nodes_table_query, info=True) %} +{% do run_query(create_v2_manifest_nodes_table_query) %} {% endmacro %} diff --git a/macros/flatten_manifest.sql b/macros/flatten_manifest.sql new file mode 100644 index 00000000..ceaee22f --- /dev/null +++ b/macros/flatten_manifest.sql @@ -0,0 +1,85 @@ +{% macro flatten_manifest(manifest_cte_name) %} + + select + manifests.command_invocation_id, + manifests.dbt_cloud_run_id, + manifests.artifact_run_id, + manifests.generated_at::timestamp_tz as artifact_generated_at, + node.key as node_id, + node.value:resource_type::string as resource_type, + node.value:database::string as node_database, + node.value:schema::string as node_schema, + node.value:name::string as name, + to_array(node.value:depends_on:nodes) as depends_on_nodes, + null as depends_on_sources, + null as exposure_type, + null as exposure_owner, + null as exposure_maturity, + null as source_name, + node.value:package_name::string as package_name, + null as relation_name, + node.value:path::string as node_path, + node.value:checksum.checksum::string as checksum, + node.value:config.materialized::string as materialization, + -- Include the raw JSON for future proofing. + node.value as node_json + from {{ manifest_cte_name }} as manifests, + lateral flatten(input => manifests.data:nodes) as node + + union all + + select + manifests.command_invocation_id, + manifests.dbt_cloud_run_id, + manifests.artifact_run_id, + manifests.generated_at::timestamp_tz as artifact_generated_at, + exposure.key as node_id, + 'exposure' as resource_type, + null as node_database, + null as node_schema, + exposure.value:name::string as name, + to_array(exposure.value:depends_on:nodes) as depends_on_nodes, + to_array(exposure.value:sources:nodes) as depends_on_sources, + exposure.value:type::string as exposure_type, + exposure.value:owner:name::string as exposure_owner, + exposure.value:maturity::string as exposure_maturity, + null as source_name, + exposure.value:package_name::string as package_name, + null as relation_name, + null as node_path, + null as checksum, + null as materialization, + -- Include the raw JSON for future proofing. + exposure.value as node_json + from {{ manifest_cte_name }} as manifests, + lateral flatten(input => manifests.data:exposures) as exposure + + union all + + select + manifests.command_invocation_id, + manifests.dbt_cloud_run_id, + manifests.artifact_run_id, + manifests.generated_at::timestamp_tz as artifact_generated_at, + source.key as node_id, + 'source' as resource_type, + source.value:database::string as node_database, + source.value:schema::string as node_schema, + source.value:name::string::string as name, + null as depends_on_nodes, + null as depends_on_sources, + null as exposure_type, + null as exposure_owner, + null as exposure_maturity, + source.value:source_name::string as source_name, + source.value:package_name::string as package_name, + source.value:relation_name::string as relation_name, + source.value:path::string as node_path, + null as checksum, + null as materialization, + -- Include the raw JSON for future proofing. + source.value as node_json + from {{ manifest_cte_name }} as manifests, + lateral flatten(input => manifests.data:sources) as source + +{% endmacro %} diff --git a/macros/flatten_results.sql b/macros/flatten_results.sql new file mode 100644 index 00000000..57323cea --- /dev/null +++ b/macros/flatten_results.sql @@ -0,0 +1,32 @@ +{% macro flatten_results(results_cte_name) %} + + select + run_results.command_invocation_id, + run_results.dbt_cloud_run_id, + run_results.artifact_run_id, + run_results.generated_at::timestamp_tz as artifact_generated_at, + run_results.data:args:which::string as execution_command, + coalesce(run_results.data:args:full_refresh, 'false')::boolean as was_full_refresh, + result.value:unique_id::string as node_id, + split(result.value:thread_id::string, '-')[1]::integer as thread_id, + result.value:status::string as status, + result.value:message::string as message, + + -- The first item in the timing array is the model-level `compile` + result.value:timing[0]:started_at::timestamp_tz as compile_started_at, + + -- The second item in the timing array is `execute`. + result.value:timing[1]:completed_at::timestamp_tz as query_completed_at, + + -- Confusingly, this does not match the delta of the above two timestamps. + -- should we calculate it instead? + coalesce(result.value:execution_time::float, 0) as total_node_runtime, + + result.value:adapter_response:rows_affected::int as rows_affected, + + -- Include the raw JSON for future proofing. + result.value as result_json + from {{ results_cte_name }} as run_results, + lateral flatten(input => run_results.data:results) as result + +{% endmacro %} diff --git a/macros/upload_artifacts_v2.sql b/macros/upload_artifacts_v2.sql new file mode 100644 index 00000000..44667a0b --- /dev/null +++ b/macros/upload_artifacts_v2.sql @@ -0,0 +1,122 @@ +{% macro upload_dbt_artifacts_v2(prefix='target/') %} + +{# All main dbt commands produce both files and so set both by default #} +{% set filenames = ['manifest', 'run_results'] %} + +{% set artifact_stage = var('dbt_artifacts_stage', 'dbt_artifacts_stage') %} + +{% set src_results = source('dbt_artifacts', 'dbt_run_results') %} +{% set src_results_nodes = source('dbt_artifacts', 'dbt_run_results_nodes') %} +{% set src_manifest_nodes = source('dbt_artifacts', 'dbt_run_manifest_nodes') %} + +{% set remove_query %} + remove @{{ artifact_stage }} pattern='.*.json.gz'; +{% endset %} + +{% set results_query %} + + insert into {{ src_results }} + with raw_data as ( + + select + run_results.$1:metadata as metadata, + run_results.$1:args as args, + run_results.$1:elapsed_time::float as elapsed_time + from @{{ artifact_stage }} as run_results + + ) + + select + metadata:invocation_id::string as command_invocation_id, + -- NOTE: DBT_CLOUD_RUN_ID is case sensitive here + metadata:env:DBT_CLOUD_RUN_ID::int as dbt_cloud_run_id, + {{ make_artifact_run_id() }} as artifact_run_id, + metadata:generated_at::timestamp_tz as artifact_generated_at, + metadata:dbt_version::string as dbt_version, + metadata:env as env, + elapsed_time, + args:which::string as execution_command, + coalesce(args:full_refresh, 'false')::boolean as was_full_refresh, + args:models as selected_models, + args:target::string as target, + metadata, + args + from raw_data; + +{% endset %} + +{% set result_nodes_query %} + + insert into {{ src_results_nodes }} + with raw_data as ( + + select + run_results.$1:metadata as metadata, + run_results.$1 as data, + metadata:invocation_id::string as command_invocation_id, + -- NOTE: DBT_CLOUD_RUN_ID is case sensitive here + metadata:env:DBT_CLOUD_RUN_ID::int as dbt_cloud_run_id, + {{ make_artifact_run_id() }} as artifact_run_id, + metadata:generated_at::timestamp_tz as generated_at + from @{{ artifact_stage }} as run_results + + ) + + {{ flatten_results("raw_data") }}; + +{% endset %} + +{% set manifest_nodes_query %} + + insert into {{ src_manifest_nodes }} + with raw_data as ( + + select + manifests.$1:metadata as metadata, + metadata:invocation_id::string as command_invocation_id, + -- NOTE: DBT_CLOUD_RUN_ID is case sensitive here + metadata:env:DBT_CLOUD_RUN_ID::int as dbt_cloud_run_id, + {{ make_artifact_run_id() }} as artifact_run_id, + metadata:generated_at::timestamp_tz as generated_at, + manifests.$1 as data + from @{{ artifact_stage }} as manifests + + ) + + {{ flatten_manifest("raw_data") }}; + +{% endset %} + +{% do log("Clearing existing files from Stage: " ~ remove_query, info=True) %} +{% do run_query(remove_query) %} + +{% for filename in filenames %} + + {% set file = filename ~ '.json' %} + + {% set put_query %} + put file://{{ prefix }}{{ file }} @{{ artifact_stage }} auto_compress=true; + {% endset %} + + {% do log("Uploading " ~ file ~ " to Stage: " ~ put_query, info=True) %} + {% do run_query(put_query) %} + + {% if filename == 'run_results' %} + {% do log("Persisting unflattened results " ~ file ~ " from Stage: " ~ results_query, info=True) %} + {% do run_query(results_query) %} + {% do log("Persisting flattened results " ~ file ~ " from Stage: " ~ result_nodes_query, info=True) %} + {% do run_query(result_nodes_query) %} + + {% elif filename == 'manifest' %} + {% do log("Persisting flattened manifest nodes " ~ file ~ " from Stage: " ~ manifest_nodes_query, info=True) %} + {% do run_query(manifest_nodes_query) %} + + {% endif %} + + {% do log("Clearing new files from Stage: " ~ remove_query, info=True) %} + {% do run_query(remove_query) %} + +{% endfor %} + +{% endmacro %} + diff --git a/models/staging/sources.yml b/models/staging/sources.yml index 3eaa59cf..5c3e7bc7 100644 --- a/models/staging/sources.yml +++ b/models/staging/sources.yml @@ -8,7 +8,8 @@ sources: - name: artifacts identifier: "{{ var('dbt_artifacts_table', 'artifacts') }}" description: | - The source table containing loaded dbt artifacts. All of the artifacts must be loaded into this table. See the README for more info. + The source table containing loaded dbt artifacts. All of the artifacts must be loaded into this table + if using the V1 upload. See the README for more info. columns: - name: data description: A variant type object containing all the artifact's data. @@ -18,3 +19,61 @@ sources: description: The path of the artifact in the external stage. - name: artifact_type description: The type of the artifact, e.g. manifest.json + - name: dbt_run_results + identifier: "{{ var('dbt_artifacts_results_table', 'dbt_run_results') }}" + description: | + The source table containing the loaded metadata from run_results.json loaded artifacts. This belongs + to the V2 upload. See the README for more info. + columns: + - name: command_invocation_id + description: The id of the command which resulted in the source artifact's generation. + - name: artifact_run_id + description: A constructed ID to serve as a reliable identifier for a single run. + - name: artifact_generated_at + description: Timestamp of when the source artifact was generated. + - name: env + description: A JSON structure of the environment variables present during the run. + - name: execution_command + description: The actual command used. + - name: was_full_refresh + description: Was the run executed with a --full-refresh flag? + - name: metadata + description: Raw JSON structure of the results.metadata field. + - name: args + description: Raw JSON structure of the results.args field. + - name: dbt_run_results_nodes + identifier: "{{ var('dbt_artifacts_result_nodes_table', 'dbt_run_results_nodes') }}" + description: | + The source table containing the loaded and flattened results from run_results.json loaded artifacts. This belongs + to the V2 upload. See the README for more info. + columns: + - name: command_invocation_id + description: The id of the command which resulted in the source artifact's generation. + - name: artifact_run_id + description: A constructed ID to serve as a reliable identifier for a single run. + - name: artifact_generated_at + description: Timestamp of when the source artifact was generated. + - name: node_id + description: Unique id for the node, in the form of exposure.[package_name].[exposure_name] + - name: result_json + description: Raw JSON structure of the result node. + - name: dbt_run_manifest_nodes + identifier: "{{ var('dbt_artifacts_manifest_nodes_table', 'dbt_run_manifest_nodes') }}" + description: | + The source table containing the loaded and flattened nodes (including tests, seeds, sources, models and + exposures) from manifest.json loaded artifacts. Note that within the raw manifest file, exposures and sources + are stored seperately from seeds, models and tests, but for convenience they are all flattened to a single table + for this package. + + This belongs to the V2 upload. See the README for more info. + columns: + - name: command_invocation_id + description: The id of the command which resulted in the source artifact's generation. + - name: artifact_run_id + description: A constructed ID to serve as a reliable identifier for a single run. + - name: artifact_generated_at + description: Timestamp of when the source artifact was generated. + - name: node_id + description: Unique id for the node, in the form of exposure.[package_name].[exposure_name] + - name: node_json + description: Raw JSON structure of the manifest node. \ No newline at end of file diff --git a/models/staging/stg_dbt__artifacts.sql b/models/staging/stg_dbt__artifacts.sql index b1fa5cc3..8bda5eff 100644 --- a/models/staging/stg_dbt__artifacts.sql +++ b/models/staging/stg_dbt__artifacts.sql @@ -36,8 +36,7 @@ artifacts as ( select command_invocation_id, dbt_cloud_run_id, - -- This ID provides a reliable ID, regardless of whether running in a local or cloud environment. - sha2_hex(coalesce(dbt_cloud_run_id::string, command_invocation_id::string), 256) as artifact_run_id, + {{ make_artifact_run_id() }} as artifact_run_id, generated_at, path, artifact_type, diff --git a/models/staging/stg_dbt__node_executions.sql b/models/staging/stg_dbt__node_executions.sql index 1cc0a7d3..02371e9d 100644 --- a/models/staging/stg_dbt__node_executions.sql +++ b/models/staging/stg_dbt__node_executions.sql @@ -12,6 +12,13 @@ base_nodes as ( ), +base_v2 as ( + + select * + from {{ source('dbt_artifacts', 'dbt_run_results_nodes') }} + +), + run_results as ( select * @@ -22,31 +29,15 @@ run_results as ( fields as ( - select - run_results.command_invocation_id, - run_results.dbt_cloud_run_id, - run_results.artifact_run_id, - run_results.generated_at as artifact_generated_at, - run_results.data:args:which::string as execution_command, - coalesce(run_results.data:args:full_refresh, 'false')::boolean as was_full_refresh, - result.value:unique_id::string as node_id, - split(result.value:thread_id::string, '-')[1]::integer as thread_id, - result.value:status::string as status, - result.value:message::string as message, - - -- The first item in the timing array is the model-level `compile` - result.value:timing[0]:started_at::timestamp_ntz as compile_started_at, - - -- The second item in the timing array is `execute`. - result.value:timing[1]:completed_at::timestamp_ntz as query_completed_at, - - -- Confusingly, this does not match the delta of the above two timestamps. - -- should we calculate it instead? - coalesce(result.value:execution_time::float, 0) as total_node_runtime, - - result.value:adapter_response:rows_affected::int as rows_affected - from run_results, - lateral flatten(input => data:results) as result + -- V1 uploads + {{ flatten_results("run_results") }} + + union all + + -- V2 uploads + -- NB: We can safely select * because we know the schemas are the same + -- as they're made by the same macro. + select * from base_v2 ), diff --git a/models/staging/stg_dbt__nodes.sql b/models/staging/stg_dbt__nodes.sql index 8e2e0e1a..89f1adce 100644 --- a/models/staging/stg_dbt__nodes.sql +++ b/models/staging/stg_dbt__nodes.sql @@ -5,6 +5,13 @@ with base as ( ), +base_v2 as ( + + select * + from {{ source('dbt_artifacts', 'dbt_run_manifest_nodes') }} + +), + manifests as ( select * @@ -15,81 +22,15 @@ manifests as ( flattened as ( - select - manifests.command_invocation_id, - manifests.dbt_cloud_run_id, - manifests.artifact_run_id, - manifests.generated_at as artifact_generated_at, - node.key as node_id, - node.value:resource_type::string as resource_type, - node.value:database::string as node_database, - node.value:schema::string as node_schema, - node.value:name::string as name, - to_array(node.value:depends_on:nodes) as depends_on_nodes, - null as depends_on_sources, - null as exposure_type, - null as exposure_owner, - null as exposure_maturity, - null as source_name, - node.value:package_name::string as package_name, - null as relation_name, - node.value:path::string as node_path, - node.value:checksum.checksum::string as checksum, - node.value:config.materialized::string as materialization - from manifests, - lateral flatten(input => data:nodes) as node + -- V1 uploads + {{ flatten_manifest("manifests") }} union all - select - manifests.command_invocation_id, - manifests.dbt_cloud_run_id, - manifests.artifact_run_id, - manifests.generated_at as artifact_generated_at, - exposure.key as node_id, - 'exposure' as resource_type, - null as node_database, - null as node_schema, - exposure.value:name::string as name, - to_array(exposure.value:depends_on:nodes) as depends_on_nodes, - to_array(exposure.value:sources:nodes) as depends_on_sources, - exposure.value:type::string as exposure_type, - exposure.value:owner:name::string as exposure_owner, - exposure.value:maturity::string as exposure_maturity, - null as source_name, - exposure.value:package_name::string as package_name, - null as relation_name, - null as node_path, - null as checksum, - null as materialization - from manifests, - lateral flatten(input => data:exposures) as exposure - - union all - - select - manifests.command_invocation_id, - manifests.dbt_cloud_run_id, - manifests.artifact_run_id, - manifests.generated_at as artifact_generated_at, - source.key as node_id, - 'source' as resource_type, - source.value:database::string as node_database, - source.value:schema::string as node_schema, - source.value:name::string::string as name, - null as depends_on_nodes, - null as depends_on_sources, - null as exposure_type, - null as exposure_owner, - null as exposure_maturity, - source.value:source_name::string as source_name, - source.value:package_name::string as package_name, - source.value:relation_name::string as relation_name, - source.value:path::string as node_path, - null as checksum, - null as materialization - from manifests, - lateral flatten(input => data:sources) as source + -- V2 uploads + -- NB: We can safely select * because we know the schemas are the same + -- as they're made by the same macro. + select * from base_v2 ), diff --git a/models/staging/stg_dbt__run_results.sql b/models/staging/stg_dbt__run_results.sql index c1762ce4..47fd041b 100644 --- a/models/staging/stg_dbt__run_results.sql +++ b/models/staging/stg_dbt__run_results.sql @@ -5,26 +5,26 @@ with base as ( ), -run_results as ( +base_v2 as ( select * - from base - where artifact_type = 'run_results.json' + from {{ source('dbt_artifacts', 'dbt_run_results') }} ), -dbt_run as ( +run_results as ( select * - from run_results - where data:args:which in ('run', 'seed', 'snapshot', 'test') + from base + where artifact_type = 'run_results.json' ), fields as ( + -- V1 select - generated_at as artifact_generated_at, + generated_at::timestamp_tz as artifact_generated_at, command_invocation_id, dbt_cloud_run_id, artifact_run_id, @@ -35,7 +35,24 @@ fields as ( coalesce(data:args:full_refresh, 'false')::boolean as was_full_refresh, data:args:models as selected_models, data:args:target::string as target - from dbt_run + from run_results + + union all + + -- V2 + select + artifact_generated_at, + command_invocation_id, + dbt_cloud_run_id, + artifact_run_id, + dbt_version, + env, + elapsed_time, + execution_command, + was_full_refresh, + selected_models, + target + from base_v2 ) diff --git a/models/staging/stg_dbt__run_results_env_keys.sql b/models/staging/stg_dbt__run_results_env_keys.sql index 68dda260..c551a297 100644 --- a/models/staging/stg_dbt__run_results_env_keys.sql +++ b/models/staging/stg_dbt__run_results_env_keys.sql @@ -1,31 +1,23 @@ with base as ( select * - from {{ ref('stg_dbt__artifacts') }} - -), - -run_results as ( - - select * - from base - where artifact_type = 'run_results.json' + from {{ ref('stg_dbt__run_results') }} ), dbt_run as ( select * - from run_results - where data:args:which = 'run' + from base + where execution_command = 'run' ), env_keys as ( - select distinct env.key + select distinct env_key.key from dbt_run, - lateral flatten(input => data:metadata:env) as env + lateral flatten(input => env) as env_key -- Sort results to ensure things are deterministic order by 1 diff --git a/tox.ini b/tox.ini index 9c25d384..0ef6f7ef 100644 --- a/tox.ini +++ b/tox.ini @@ -94,6 +94,9 @@ commands = sqlfluff fix models --ignore parsing changedir = integration_test_project commands = dbt deps + + # V1 upload tests + dbt run-operation drop_test_schema # Make sure we've got a clean slate dbt run-operation create_artifact_resources dbt run-operation upload_dbt_artifacts --args '\{filenames: [manifest, run_results], prefix: artifacts/schema_v4/build/\}' dbt run-operation upload_dbt_artifacts --args '\{filenames: [manifest, run_results], prefix: artifacts/schema_v4/build_full_refresh/\}' @@ -106,3 +109,17 @@ commands = dbt build -s dbt_artifacts --full-refresh dbt build -s dbt_artifacts + + # V2 upload tests + dbt run-operation drop_test_schema # Make sure we've got a clean slate + dbt run-operation create_artifact_resources + dbt run-operation upload_dbt_artifacts_v2 --args '\{prefix: artifacts/schema_v4/build/\}' + dbt run-operation upload_dbt_artifacts_v2 --args '\{prefix: artifacts/schema_v4/build_full_refresh/\}' + dbt run-operation upload_dbt_artifacts_v2 --args '\{prefix: artifacts/schema_v4/run/\}' + dbt run-operation upload_dbt_artifacts_v2 --args '\{prefix: artifacts/schema_v4/run_full_refresh/\}' + dbt run-operation upload_dbt_artifacts_v2 --args '\{prefix: artifacts/schema_v4/seed/\}' + dbt run-operation upload_dbt_artifacts_v2 --args '\{prefix: artifacts/schema_v4/snapshot/\}' + dbt run-operation upload_dbt_artifacts_v2 --args '\{prefix: artifacts/schema_v4/test/\}' + + dbt build -s dbt_artifacts --full-refresh + dbt build -s dbt_artifacts