From 63550bd00a7558a451e86e2e9ac573efa11b5c85 Mon Sep 17 00:00:00 2001 From: kgpayne Date: Fri, 5 Mar 2021 14:00:28 +0000 Subject: [PATCH] Add `sources` staging model (#5) * Added sources model and extracted additional columns from manifest json. * Dedupe artifacts, add docs and add dim_dbt__sources --- .gitignore | 4 ++ models/incremental/dim_dbt__models.sql | 3 +- models/incremental/dim_dbt__sources.sql | 38 +++++++++++++ models/schemas.yml | 76 +++++++++++++++++-------- models/staging/stg_dbt__artifacts.sql | 32 ++++++++++- models/staging/stg_dbt__models.sql | 10 ++-- models/staging/stg_dbt__sources.sql | 51 +++++++++++++++++ 7 files changed, 181 insertions(+), 33 deletions(-) create mode 100644 models/incremental/dim_dbt__sources.sql create mode 100644 models/staging/stg_dbt__sources.sql diff --git a/.gitignore b/.gitignore index dad33a45..e8e34a2d 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,7 @@ target/ dbt_modules/ logs/ + +.vscode/ +Pipfile +Pipfile.lock diff --git a/models/incremental/dim_dbt__models.sql b/models/incremental/dim_dbt__models.sql index a38fbd9d..5d77d096 100644 --- a/models/incremental/dim_dbt__models.sql +++ b/models/incremental/dim_dbt__models.sql @@ -25,8 +25,9 @@ fields as ( command_invocation_id, artifact_generated_at, node_id, - name, + model_database, model_schema, + name, depends_on_nodes, package_name, model_path, diff --git a/models/incremental/dim_dbt__sources.sql b/models/incremental/dim_dbt__sources.sql new file mode 100644 index 00000000..edc0309b --- /dev/null +++ b/models/incremental/dim_dbt__sources.sql @@ -0,0 +1,38 @@ +{{ config( materialized='incremental', unique_key='manifest_source_id' ) }} + +with dbt_sources as ( + + select * from {{ ref('stg_dbt__sources') }} + +), + +dbt_sources_incremental as ( + + select * + from dbt_sources + + {% if is_incremental() %} + -- this filter will only be applied on an incremental run + where artifact_generated_at > (select max(artifact_generated_at) from {{ this }}) + {% endif %} + +), + +fields as ( + + select + manifest_source_id, + command_invocation_id, + artifact_generated_at, + node_id, + name, + source_name, + source_schema, + package_name, + relation_name, + source_path + from dbt_sources_incremental + +) + +select * from fields diff --git a/models/schemas.yml b/models/schemas.yml index 4153d35d..f4779a5c 100644 --- a/models/schemas.yml +++ b/models/schemas.yml @@ -104,28 +104,54 @@ models: - name: env_* description: Columns for the environment variables set when the command was executed. - - name: dim_dbt__models - description: All dbt model metadata from every manifest.json. - columns: - - name: manifest_model_id - description: Primary key generated from the command_invocation_id and checksum. - tests: - - unique - - not_null - - name: command_invocation_id - description: The id of the command which resulted in the source artifact's generation. - - name: artifact_generated_at - description: Timestamp of when the source artifact was generated. - - name: node_id - description: Unique id for the node, in the form of model.[package_name].[model_name] - - name: name - description: The model name. - - name: model_schema - - name: depends_on_nodes - description: List of node ids the model depends on. - - name: package_name - - name: model_path - description: Filepath of the model. - - name: checksum - description: Unique identifier for the model. If a model is unchanged between separate executions this will remain the same. - - name: model_materialization + - name: dim_dbt__models + description: All dbt model metadata from every manifest.json. + columns: + - name: manifest_model_id + description: Primary key generated from the command_invocation_id and checksum. + tests: + - unique + - not_null + - name: command_invocation_id + description: The id of the command which resulted in the source artifact's generation. + - name: artifact_generated_at + description: Timestamp of when the source artifact was generated. + - name: node_id + description: Unique id for the node, in the form of model.[package_name].[model_name] + - name: name + description: The model name. + - name: model_schema + - name: depends_on_nodes + description: List of node ids the model depends on. + - name: package_name + - name: model_path + description: Filepath of the model. + - name: checksum + description: Unique identifier for the model. If a model is unchanged between separate executions this will remain the same. + - name: model_materialization + + - name: dim_dbt__sources + description: All dbt source metadata from every manifest.json. + columns: + - name: manifest_source_id + description: Primary key generated from the command_invocation_id and checksum. + tests: + - unique + - not_null + - name: command_invocation_id + description: The id of the command which resulted in the source artifact's generation. + - name: artifact_generated_at + description: Timestamp of when the source artifact was generated. + - name: node_id + description: Unique id for the node, in the form of model.[package_name].[model_name] + - name: name + description: The source node name. + - name: source_name + description: The name of the source. + - name: source_schema + - name: package_name + description: Package source is defined in. + - name: relation_name + description: Name of the database entity this source resolved to. + - name: source_path + description: Filepath of the source. diff --git a/models/staging/stg_dbt__artifacts.sql b/models/staging/stg_dbt__artifacts.sql index 9dc9a7ad..f79937ec 100644 --- a/models/staging/stg_dbt__artifacts.sql +++ b/models/staging/stg_dbt__artifacts.sql @@ -8,12 +8,38 @@ with base as ( fields as ( select - data, + data:metadata:invocation_id::string as command_invocation_id, generated_at, path, - artifact_type + artifact_type, + data from base +), + +duduped as ( + + select + *, + row_number() over ( + partition by command_invocation_id, artifact_type + order by generated_at desc + ) as index + from fields + qualify index = 1 + +), + +artifacts as ( + + select + command_invocation_id, + generated_at, + path, + artifact_type, + data + from duduped + ) -select * from fields +select * from artifacts \ No newline at end of file diff --git a/models/staging/stg_dbt__models.sql b/models/staging/stg_dbt__models.sql index 7e72953a..ba92a872 100644 --- a/models/staging/stg_dbt__models.sql +++ b/models/staging/stg_dbt__models.sql @@ -16,11 +16,12 @@ manifests as ( flatten as ( select - data:metadata:invocation_id::string as command_invocation_id, + command_invocation_id, generated_at as artifact_generated_at, node.key as node_id, - node.value:name::string as name, + node.value:database::string as model_database, node.value:schema::string as model_schema, + node.value:name::string as name, to_array(node.value:depends_on:nodes) as depends_on_nodes, node.value:package_name::string as package_name, node.value:path::string as model_path, @@ -35,12 +36,13 @@ flatten as ( surrogate_key as ( select - {{ dbt_utils.surrogate_key(['command_invocation_id', 'checksum']) }} as manifest_model_id, + {{ dbt_utils.surrogate_key(['command_invocation_id', 'node_id']) }} as manifest_model_id, command_invocation_id, artifact_generated_at, node_id, - name, + model_database, model_schema, + name, depends_on_nodes, package_name, model_path, diff --git a/models/staging/stg_dbt__sources.sql b/models/staging/stg_dbt__sources.sql new file mode 100644 index 00000000..34a53f10 --- /dev/null +++ b/models/staging/stg_dbt__sources.sql @@ -0,0 +1,51 @@ +with base as ( + + select * + from {{ ref('stg_dbt__artifacts') }} + +), + +manifests as ( + + select * + from base + where artifact_type = 'manifest.json' + +), + +flatten as ( + + select + command_invocation_id, + generated_at as artifact_generated_at, + node.key as node_id, + node.value:name::string as name, + node.value:source_name::string as source_name, + node.value:schema::string as source_schema, + node.value:package_name::string as package_name, + node.value:relation_name::string as relation_name, + node.value:path::string as source_path + from manifests, + lateral flatten(input => data:sources) as node + where node.value:resource_type = 'source' + +), + +surrogate_key as ( + + select + {{ dbt_utils.surrogate_key(['command_invocation_id', 'node_id']) }} as manifest_source_id, + command_invocation_id, + artifact_generated_at, + node_id, + name, + source_name, + source_schema, + package_name, + relation_name, + source_path + from flatten + +) + +select * from surrogate_key