Skip to content

Commit

Permalink
Merge pull request #31 from fivetran/feature/audit-incrementality
Browse files Browse the repository at this point in the history
Audit table incrementality  + Postgres compatibility
  • Loading branch information
fivetran-jamie authored Jan 14, 2022
2 parents 1a7b6a6 + 85b62c3 commit 0b06213
Show file tree
Hide file tree
Showing 20 changed files with 137 additions and 33 deletions.
14 changes: 14 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ jobs:
dbt deps
dbt seed --target redshift --full-refresh
dbt run --target redshift --full-refresh
dbt run --target redshift
dbt test --target redshift
- run:
name: "Run Tests - Snowflake"
Expand All @@ -44,6 +45,7 @@ jobs:
dbt deps
dbt seed --target snowflake --full-refresh
dbt run --target snowflake --full-refresh
dbt run --target snowflake
dbt test --target snowflake
- run:
name: "Run Tests - BigQuery"
Expand All @@ -57,7 +59,19 @@ jobs:
dbt deps
dbt seed --target bigquery --full-refresh
dbt run --target bigquery --full-refresh
dbt run --target bigquery
dbt test --target bigquery
- run:
name: "Run Tests - Postgres"
command: |
. venv/bin/activate
echo `pwd`
cd integration_tests
dbt deps
dbt seed --target postgres --full-refresh
dbt run --target postgres --full-refresh
dbt run --target postgres
dbt test --target postgres
- save_cache:
key: deps2-{{ .Branch }}
paths:
Expand Down
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
# dbt_fivetran_log v0.5.0
🎉 Official dbt v1.0.0 Compatibility Release 🎉
## 🚨 Breaking Changes 🚨
- Adjusts the `require-dbt-version` to now be within the range [">=1.0.0", "<2.0.0"]. Additionally, the package has been updated for dbt v1.0.0 compatibility. If you are using a dbt version <1.0.0, you will need to upgrade in order to leverage the latest version of the package.
- For help upgrading your package, I recommend reviewing this GitHub repo's Release Notes on what changes have been implemented since your last upgrade.
- For help upgrading your dbt project to dbt v1.0.0, I recommend reviewing dbt-labs [upgrading to 1.0.0 docs](https://docs.getdbt.com/docs/guides/migration-guide/upgrading-to-1-0-0) for more details on what changes must be made.
- Upgrades the package dependency to refer to the latest `dbt_fivetran_utils`. The latest `dbt_fivetran_utils` package also has a dependency on `dbt_utils` [">=0.8.0", "<0.9.0"].
- Please note, if you are installing a version of `dbt_utils` in your `packages.yml` that is not in the range above then you will encounter a package dependency error.

## Additional Features
- Materializes the `fivetran_log__audit_table` incrementally, and employs partitioning for BigQuery users. As a non-incremental table, this model involved high runtimes for some users ([#27](https://github.com/fivetran/dbt_fivetran_log/issues/27))
- If you would like to apply partitioning to the underlying source tables (ie the `LOG` table), refer to [Fivetran docs](https://fivetran.com/docs/destinations/bigquery/partition-table) on how to do so.
- Expands compatibility to Postgres!

# dbt_fivetran_log v0.5.0-b1
🎉 dbt v1.0.0 Compatibility Pre Release 🎉 An official dbt v1.0.0 compatible version of the package will be released once existing feature/bug PRs are merged.
## 🚨 Breaking Changes 🚨
Expand Down
11 changes: 8 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ The package's main goals are to:


## Installation Instructions
`dbt_fivetran_log` currently supports `dbt 0.20.x`.
`dbt_fivetran_log` currently supports `dbt 1.0.x`.

Check [dbt Hub](https://hub.getdbt.com/) for the latest installation instructions, or [read the dbt docs](https://docs.getdbt.com/docs/package-management) for more information on installing packages.

Expand All @@ -43,7 +43,7 @@ Include in your `packages.yml`
```yaml
packages:
- package: fivetran/fivetran_log
version: 0.5.0-b1
version: 0.5.0
```
## Package Maintenance
Expand Down Expand Up @@ -111,13 +111,18 @@ models:

*Read more about using custom schemas in dbt [here](https://docs.getdbt.com/docs/building-a-dbt-project/building-models/using-custom-schemas).*

### Partitioning Source Tables in BigQuery
By default, the `fivetran_log__audit_table` is materialized as an incremental table, with partitions in BigQuery databases. BigQuery users may also want to add partitions to large underlying source tables, such as the `LOG` table, to optimize performance and query costs.

If you would like to apply partitioning to _source_ tables, please refer to the [Fivetran docs](https://fivetran.com/docs/destinations/bigquery/partition-table) for how to do so.

## Contributions
Don't see a model or specific metric you would have liked to be included? Notice any bugs when installing
and running the package? If so, we highly encourage and welcome contributions to this package!
Please create issues or open PRs against `main`. See [the Discourse post](https://discourse.getdbt.com/t/contributing-to-a-dbt-package/657) for information on how to contribute to a package.

## Database Support
This package has been tested on BigQuery, Snowflake and Redshift.
This package has been tested on BigQuery, Snowflake, Redshift, and Postgres.

## Resources:
- Provide [feedback](https://www.surveymonkey.com/r/DQ7K7WW) on our existing dbt packages or what you'd like to see next
Expand Down
2 changes: 1 addition & 1 deletion docs/catalog.json

Large diffs are not rendered by default.

Binary file removed docs/graph.gpickle
Binary file not shown.
16 changes: 8 additions & 8 deletions docs/index.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/manifest.json

Large diffs are not rendered by default.

Binary file removed docs/partial_parse.msgpack
Binary file not shown.
Binary file removed docs/partial_parse.pickle
Binary file not shown.
1 change: 0 additions & 1 deletion docs/run_results.json

This file was deleted.

Binary file modified integration_tests/.DS_Store
Binary file not shown.
9 changes: 9 additions & 0 deletions integration_tests/ci/sample.profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,13 @@ integration_tests:
database: "{{ env_var('CI_SNOWFLAKE_DBT_DATABASE') }}"
warehouse: "{{ env_var('CI_SNOWFLAKE_DBT_WAREHOUSE') }}"
schema: fivetran_log_integration_tests
threads: 8
postgres:
type: postgres
host: "{{ env_var('CI_POSTGRES_DBT_HOST') }}"
user: "{{ env_var('CI_POSTGRES_DBT_USER') }}"
pass: "{{ env_var('CI_POSTGRES_DBT_PASS') }}"
dbname: "{{ env_var('CI_POSTGRES_DBT_DBNAME') }}"
port: 5432
schema: fivetran_log_integration_tests
threads: 8
4 changes: 2 additions & 2 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: 'fivetran_log_integration_tests'
version: '0.4.1'
version: '0.5.0'
config-version: 2
profile: 'integration_tests'

Expand Down Expand Up @@ -58,7 +58,7 @@ seeds:
destination_id: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}"
id: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}"
trigger_table:
+quote_columns: "{{ true if target.type == 'redshift' else false }}"
+quote_columns: "{{ true if target.type in ('redshift', 'postgres') else false }}"
+enabled: "{{ true if target.type != 'snowflake' else false }}"
+column_types:
transformation_id: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}"
Expand Down
4 changes: 2 additions & 2 deletions integration_tests/seeds/connector.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
connector_id,_fivetran_synced,connecting_user_id,connector_name,connector_type,destination_id,paused,signed_up
exceedingly_twenty,2021-03-12 21:03:45.994,,s3.s3_testing,s3,hammer_consternation,true,2020-08-31 19:00:22.913630
intrinsic_departed,2021-03-12 21:03:45.991,off_ence,s3.patrick_star,s3,12i5abmnmoyfb,true,2019-03-22 18:49:12.833910
exceedingly_twenty,2021-03-12 21:03:45.994,,s3.s3_testing,s3,ups_trampoline,true,2020-08-31 19:00:22.913630
intrinsic_departed,2021-03-12 21:03:45.991,off_ence,s3.patrick_star,s3,television_trace,true,2019-03-22 18:49:12.833910
22 changes: 22 additions & 0 deletions integration_tests/seeds/log.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,25 @@
id,time_stamp,_fivetran_synced,connector_id,event,message_data,message_event,transformation_id
legislative_lazy,2021-02-13 13:34:03.343,2021-02-16 21:11:07.065,legislative_lazy,WARNING,"{""type"":""table_excluded_by_system"",""message"":""salesforce.affectlayer__AffectLayer_User_Setting__ChangeEvent has been Excluded by system. Reason : Not queryable""}",warning,
paint_wedges,2021-02-12 08:15:05.555,2021-02-16 21:48:51.493,paint_wedges,INFO,"{""method"":""GET"",""uri"":""https://google.com""}",api_call,
intrinsic_departed,2021-12-09 14:27:00.504,2021-12-09 20:30:53.959,intrinsic_departed,INFO,,sync_end,
intrinsic_departed,2021-12-09 14:26:52.433,2021-12-09 20:30:53.957,intrinsic_departed,INFO,"{""schema"":""instagram_business"",""count"":4,""operationType"":""REPLACED_OR_INSERTED"",""table"":""fivetran_audit""}",records_modified,
intrinsic_departed,2021-12-09 14:26:52.242,2021-12-09 20:30:53.955,intrinsic_departed,INFO,"{""table"":""fivetran_audit""}",write_to_table_end,
intrinsic_departed,2021-12-09 14:26:51.703,2021-12-09 20:30:53.955,intrinsic_departed,INFO,"{""table"":""fivetran_audit""}",write_to_table_end,
intrinsic_departed,2021-12-09 14:26:44,2021-12-09 20:30:53.952,intrinsic_departed,INFO,"{""table"":""fivetran_audit""}",write_to_table_start,
intrinsic_departed,2021-12-09 14:26:42.084,2021-12-09 20:30:53.950,intrinsic_departed,INFO,"{""schema"":""instagram_business"",""count"":9,""operationType"":""REPLACED_OR_INSERTED"",""table"":""media_history""}",records_modified,
intrinsic_departed,2021-12-09 14:26:42.029,2021-12-09 20:30:53.949,intrinsic_departed,INFO,"{""schema"":""instagram_business"",""count"":5,""operationType"":""REPLACED_OR_INSERTED"",""table"":""media_insights""}",records_modified,
intrinsic_departed,2021-12-09 14:26:41.886,2021-12-09 20:30:53.948,intrinsic_departed,INFO,"{""table"":""media_history""}",write_to_table_end,
intrinsic_departed,2021-12-09 14:26:41.875,2021-12-09 20:30:53.948,intrinsic_departed,INFO,"{""table"":""media_insights""}",write_to_table_end,
intrinsic_departed,2021-12-09 14:26:40.880,2021-12-09 20:30:53.946,intrinsic_departed,INFO,"{""table"":""media_history""}",write_to_table_end,
intrinsic_departed,2021-12-09 14:26:40.737,2021-12-09 20:30:53.944,intrinsic_departed,INFO,"{""table"":""media_insights""}",write_to_table_end,
intrinsic_departed,2021-12-09 14:26:39.469,2021-12-09 20:30:53.935,intrinsic_departed,INFO,"{""schema"":""instagram_business"",""count"":1,""operationType"":""REPLACED_OR_INSERTED"",""table"":""user_history""}",records_modified,
intrinsic_departed,2021-12-09 14:26:39.446,2021-12-09 20:30:53.932,intrinsic_departed,INFO,"{""schema"":""instagram_business"",""count"":3,""operationType"":""REPLACED_OR_INSERTED"",""table"":""user_insights""}",records_modified,
intrinsic_departed,2021-12-09 14:26:39.092,2021-12-09 20:30:53.930,intrinsic_departed,INFO,"{""table"":""user_insights""}",write_to_table_end,
intrinsic_departed,2021-12-09 14:26:39.087,2021-12-09 20:30:53.929,intrinsic_departed,INFO,"{""table"":""user_history""}",write_to_table_end,
intrinsic_departed,2021-12-09 14:26:37.735,2021-12-09 20:30:53.928,intrinsic_departed,INFO,"{""table"":""user_history""}",write_to_table_end,
intrinsic_departed,2021-12-09 14:26:37.691,2021-12-09 20:30:53.920,intrinsic_departed,INFO,"{""table"":""user_insights""}",write_to_table_end,
intrinsic_departed,2021-12-09 14:26:29.860,2021-12-09 20:30:53.904,intrinsic_departed,INFO,"{""table"":""media_history""}",write_to_table_start,
intrinsic_departed,2021-12-09 14:26:29.814,2021-12-09 20:30:53.903,intrinsic_departed,INFO,"{""table"":""user_insights""}",write_to_table_start,
intrinsic_departed,2021-12-09 14:26:29.744,2021-12-09 20:30:53.901,intrinsic_departed,INFO,"{""table"":""user_history""}",write_to_table_start,
intrinsic_departed,2021-12-09 14:26:29.719,2021-12-09 20:30:53.878,intrinsic_departed,INFO,"{""table"":""media_insights""}",write_to_table_start,
intrinsic_departed,2021-12-09 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,,sync_start,
12 changes: 5 additions & 7 deletions models/fivetran_log.yml
Original file line number Diff line number Diff line change
Expand Up @@ -245,14 +245,12 @@ models:
captures timestamps and row quantities around table syncs. Each record represents a table being
written to in a connector sync. Note: the quantities here will not match the deprecated `fivetran_audit`
table completely, as `fivetran_audit` reported on pre-duplicated data loads.
tests:
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- connector_id
- destination_id
- table_name
- write_to_table_start
columns:
- name: unique_table_sync_key
description: Primary key of this table, hashed on `connector_id`, `destination_id`, `table_name`, and `unique_table_sync_key`.
tests:
- unique
- not_null
- name: connector_id
description: System generated unique ID of the connector.
- name: connector_name
Expand Down
45 changes: 43 additions & 2 deletions models/fivetran_log__audit_table.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
{{ config(
materialized='incremental',
unique_key='unique_table_sync_key',
partition_by={
'field': 'sync_start',
'data_type': 'timestamp',
'granularity': 'day'
} if target.type == 'bigquery' else none,
incremental_strategy = 'merge',
file_format = 'delta'
) }}

with sync_log as (

select
Expand All @@ -7,13 +19,32 @@ with sync_log as (
from {{ ref('stg_fivetran_log__log') }}

where event_subtype in ('sync_start', 'sync_end', 'write_to_table_start', 'write_to_table_end', 'records_modified')

{% if is_incremental() %}

-- Capture the latest timestamp in a call statement instead of a subquery for optimizing BQ costs on incremental runs
{%- call statement('max_sync_start', fetch_result=True) -%}
select date(max(sync_start)) from {{ this }}
{%- endcall -%}

-- load the result from the above query into a new variable
{%- set query_result = load_result('max_sync_start') -%}

-- the query_result is stored as a dataframe. Therefore, we want to now store it as a singular value.
{%- set max_sync_start = query_result['data'][0][0] -%}

-- compare the new batch of data to the latest sync already stored in this model
and date(created_at) >= '{{ max_sync_start }}'

{% endif %}
),


connector as (

select *
from {{ ref('fivetran_log__connector_status') }}

),

add_connector_info as (
Expand Down Expand Up @@ -106,8 +137,18 @@ sum_records_modified as (
and records_modified_log.created_at > limit_to_table_starts.sync_start
and records_modified_log.created_at < coalesce(limit_to_table_starts.sync_end, limit_to_table_starts.next_sync_start)

group by 1,2,3,4,5,6,7,8,9
{{ dbt_utils.group_by(n=9) }}

),

surrogate_key as (

select
*,
{{ dbt_utils.surrogate_key(['connector_id', 'destination_id', 'table_name', 'write_to_table_start']) }} as unique_table_sync_key

from sum_records_modified
)

select *
from sum_records_modified
from surrogate_key
4 changes: 2 additions & 2 deletions models/fivetran_log__connector_daily_events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ spine as (

{{ dbt_utils.date_spine(
datepart = "day",
start_date = "'" ~ first_date[0:10] ~ "'",
start_date = "cast('" ~ first_date[0:10] ~ "' as date)",
end_date = dbt_utils.dateadd("week", 1, dbt_utils.date_trunc('day', dbt_utils.current_timestamp()))
)
}}
Expand Down Expand Up @@ -140,4 +140,4 @@ final as (
)

select *
from final
from final
8 changes: 4 additions & 4 deletions models/fivetran_log__connector_status.sql
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ connector_metrics as (
from connector
left join connector_log
on connector_log.connector_id = connector.connector_id
group by 1,2,3,4,5,6
{{ dbt_utils.group_by(n=6) }}

),

Expand Down Expand Up @@ -133,7 +133,7 @@ connector_recent_logs as (
and {{ fivetran_utils.json_extract(string="connector_log.message_data", string_path="status") }} ='RESCHEDULED'
and {{ fivetran_utils.json_extract(string="connector_log.message_data", string_path="reason") }} like '%intended behavior%')

group by 1,2,3,4,5,6,7,8,9,10,11 -- de-duping error messages
{{ dbt_utils.group_by(n=11) }} -- de-duping error messages


),
Expand All @@ -153,8 +153,8 @@ final as (
coalesce(schema_changes.number_of_schema_changes_last_month, 0) as number_of_schema_changes_last_month

{% if var('fivetran_log_using_sync_alert_messages', true) %}
, {{ fivetran_utils.string_agg("case when connector_recent_logs.event_type = 'SEVERE' then connector_recent_logs.message_data else null end", "'\\n'") }} as errors_since_last_completed_sync
, {{ fivetran_utils.string_agg("case when connector_recent_logs.event_type = 'WARNING' then connector_recent_logs.message_data else null end", "'\\n'") }} as warnings_since_last_completed_sync
, {{ fivetran_utils.string_agg("distinct case when connector_recent_logs.event_type = 'SEVERE' then connector_recent_logs.message_data else null end", "'\\n'") }} as errors_since_last_completed_sync
, {{ fivetran_utils.string_agg("distinct case when connector_recent_logs.event_type = 'WARNING' then connector_recent_logs.message_data else null end", "'\\n'") }} as warnings_since_last_completed_sync
{% endif %}

from connector_recent_logs
Expand Down
2 changes: 2 additions & 0 deletions models/staging/stg_fivetran_log__trigger_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ fields as (
select
{% if target.type == 'bigquery' %}
table as trigger_table,
{% elif target.type == 'postgres' %}
"table" as trigger_table,
{% else %}
"TABLE" as trigger_table,
{% endif %}
Expand Down

0 comments on commit 0b06213

Please sign in to comment.