Skip to content

Commit

Permalink
Add more resource types into package (#1)
Browse files Browse the repository at this point in the history
* Add seed, snapshot, test dimensions and executions

* Change run_results to include seed, snapshot, test

* Update README with models

* Update attribute names to be resource-specific

* Update resource columns in execution models

* Pass depends_on_nodes to fct model
  • Loading branch information
stkbailey authored Sep 5, 2021
1 parent 27d81ab commit 77856cb
Show file tree
Hide file tree
Showing 16 changed files with 579 additions and 19 deletions.
12 changes: 9 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@ This package builds a mart of tables from dbt artifacts loaded into a table. It
Models included:

- `dim_dbt__models`
- `fct_dbt__model_executions`
- `fct_dbt__latest_full_model_executions`
- `dim_dbt__seeds`
- `dim_dbt__snapshots`
- `dim_dbt__tests`
- `fct_dbt__critical_path`
- `fct_dbt_run_results`
- `fct_dbt__latest_full_model_executions`
- `fct_dbt__model_executions`
- `fct_dbt__run_results`
- `fct_dbt__seed_executions`
- `fct_dbt__snapshot_executions`
- `fct_dbt__test_executions`

The critical path model determines the slowest route through your DAG, which provides you with the information needed to make a targeted effort to reducing `dbt run` times. For example:

Expand Down
12 changes: 6 additions & 6 deletions models/incremental/dim_dbt__exposures.sql
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
{{
config(
materialized='incremental',
unique_key='manifest_model_id'
unique_key='manifest_exposure_id'
)
}}

with dbt_models as (
with dbt_exposures as (

select * from {{ ref('stg_dbt__exposures') }}

),

dbt_models_incremental as (
dbt_exposures_incremental as (

select *
from dbt_models
from dbt_exposures

{% if is_incremental() %}
-- this filter will only be applied on an incremental run
Expand All @@ -26,7 +26,7 @@ dbt_models_incremental as (
fields as (

select
t.manifest_model_id,
t.manifest_exposure_id,
t.command_invocation_id,
t.dbt_cloud_run_id,
t.artifact_generated_at,
Expand All @@ -37,7 +37,7 @@ fields as (
t.maturity,
f.value::string as output_feeds,
t.package_name
from dbt_models_incremental as t,
from dbt_exposures_incremental as t,
lateral flatten(input => depends_on_nodes) as f

)
Expand Down
40 changes: 40 additions & 0 deletions models/incremental/dim_dbt__seeds.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{{ config( materialized='incremental', unique_key='manifest_seed_id' ) }}

with dbt_seeds as (

select * from {{ ref('stg_dbt__seeds') }}

),

dbt_seeds_incremental as (

select *
from dbt_seeds

{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where artifact_generated_at > (select max(artifact_generated_at) from {{ this }})
{% endif %}

),

fields as (

select
manifest_seed_id,
command_invocation_id,
dbt_cloud_run_id,
artifact_generated_at,
node_id,
seed_database,
seed_schema,
name,
depends_on_nodes,
package_name,
seed_path,
checksum
from dbt_seeds_incremental

)

select * from fields
40 changes: 40 additions & 0 deletions models/incremental/dim_dbt__snapshots.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{{ config( materialized='incremental', unique_key='manifest_snapshot_id' ) }}

with dbt_snapshots as (

select * from {{ ref('stg_dbt__snapshots') }}

),

dbt_snapshots_incremental as (

select *
from dbt_snapshots

{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where artifact_generated_at > (select max(artifact_generated_at) from {{ this }})
{% endif %}

),

fields as (

select
manifest_snapshot_id,
command_invocation_id,
dbt_cloud_run_id,
artifact_generated_at,
node_id,
snapshot_database,
snapshot_schema,
name,
depends_on_nodes,
package_name,
snapshot_path,
checksum
from dbt_snapshots_incremental

)

select * from fields
37 changes: 37 additions & 0 deletions models/incremental/dim_dbt__tests.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{{ config( materialized='incremental', unique_key='manifest_test_id' ) }}

with dbt_tests as (

select * from {{ ref('stg_dbt__tests') }}

),

dbt_tests_incremental as (

select *
from dbt_tests

{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where artifact_generated_at > (select max(artifact_generated_at) from {{ this }})
{% endif %}

),

fields as (

select
manifest_test_id,
command_invocation_id,
dbt_cloud_run_id,
artifact_generated_at,
node_id,
name,
depends_on_nodes,
package_name,
test_path
from dbt_tests_incremental

)

select * from fields
66 changes: 66 additions & 0 deletions models/incremental/fct_dbt__seed_executions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
{{ config( materialized='incremental', unique_key='seed_execution_id' ) }}

with seeds as (

select *
from {{ ref('dim_dbt__seeds') }}

),

seed_executions as (

select *
from {{ ref('stg_dbt__seed_executions') }}

),

seed_executions_incremental as (

select *
from seed_executions

{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where artifact_generated_at > (select max(artifact_generated_at) from {{ this }})
{% endif %}

),

seed_executions_with_materialization as (

select
seed_executions_incremental.*,
seeds.seed_schema,
seeds.name
from seed_executions_incremental
left join seeds on
(
seed_executions_incremental.command_invocation_id = seeds.command_invocation_id
or seed_executions_incremental.dbt_cloud_run_id = seeds.dbt_cloud_run_id
)
and seed_executions_incremental.node_id = seeds.node_id

),

fields as (

select
seed_execution_id,
command_invocation_id,
dbt_cloud_run_id,
artifact_generated_at,
was_full_refresh,
node_id,
thread_id,
status,
compile_started_at,
query_completed_at,
total_node_runtime,
rows_affected,
seed_schema,
name
from seed_executions_with_materialization

)

select * from fields
66 changes: 66 additions & 0 deletions models/incremental/fct_dbt__snapshot_executions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
{{ config( materialized='incremental', unique_key='snapshot_execution_id' ) }}

with snapshots as (

select *
from {{ ref('dim_dbt__snapshots') }}

),

snapshot_executions as (

select *
from {{ ref('stg_dbt__snapshot_executions') }}

),

snapshot_executions_incremental as (

select *
from snapshot_executions

{% if is_incremental() %}
-- this filter will only be applied on an incremental run
where artifact_generated_at > (select max(artifact_generated_at) from {{ this }})
{% endif %}

),

snapshot_executions_with_materialization as (

select
snapshot_executions_incremental.*,
snapshots.snapshot_schema,
snapshots.name
from snapshot_executions_incremental
left join snapshots on
(
snapshot_executions_incremental.command_invocation_id = snapshots.command_invocation_id
or snapshot_executions_incremental.dbt_cloud_run_id = snapshots.dbt_cloud_run_id
)
and snapshot_executions_incremental.node_id = snapshots.node_id

),

fields as (

select
snapshot_execution_id,
command_invocation_id,
dbt_cloud_run_id,
artifact_generated_at,
was_full_refresh,
node_id,
thread_id,
status,
compile_started_at,
query_completed_at,
total_node_runtime,
rows_affected,
snapshot_schema,
name
from snapshot_executions_with_materialization

)

select * from fields
12 changes: 6 additions & 6 deletions models/incremental/fct_dbt__test_executions.sql
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
{{ config( materialized='incremental', unique_key='model_execution_id' ) }}
{{ config( materialized='incremental', unique_key='test_execution_id' ) }}

with model_executions as (
with test_executions as (

select *
from {{ ref('stg_dbt__test_executions') }}

),

model_executions_incremental as (
test_executions_incremental as (

select *
from model_executions
from test_executions

{% if is_incremental() %}
-- this filter will only be applied on an incremental run
Expand All @@ -22,7 +22,7 @@ model_executions_incremental as (
fields as (

select
model_execution_id,
test_execution_id,
command_invocation_id,
dbt_cloud_run_id,
artifact_generated_at,
Expand All @@ -34,7 +34,7 @@ fields as (
query_completed_at,
total_node_runtime,
rows_affected
from model_executions_incremental
from test_executions_incremental

)

Expand Down
2 changes: 1 addition & 1 deletion models/staging/stg_dbt__exposures.sql
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ flatten as (
surrogate_key as (

select
{{ dbt_utils.surrogate_key(['command_invocation_id', 'node_id']) }} as manifest_model_id,
{{ dbt_utils.surrogate_key(['command_invocation_id', 'node_id']) }} as manifest_exposure_id,
command_invocation_id,
dbt_cloud_run_id,
artifact_generated_at,
Expand Down
2 changes: 1 addition & 1 deletion models/staging/stg_dbt__run_results.sql
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ dbt_run as (

select *
from run_results
where data:args:which = 'run'
where data:args:which in ('run', 'seed', 'snapshot', 'test')

),

Expand Down
Loading

0 comments on commit 77856cb

Please sign in to comment.