Skip to content

Commit

Permalink
Merge pull request #114 from fivetran/bug/redshift-json-parse
Browse files Browse the repository at this point in the history
Bug/redshift json parse
  • Loading branch information
fivetran-catfritz authored Feb 20, 2024
2 parents a1a4662 + 56b1214 commit 1627958
Show file tree
Hide file tree
Showing 18 changed files with 153 additions and 52 deletions.
2 changes: 1 addition & 1 deletion .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ steps:
commands: |
bash .buildkite/scripts/run_models.sh redshift
- label: ":bricks: Run Tests - Databricks"
- label: ":databricks: Run Tests - Databricks"
key: "run_dbt_databricks"
plugins:
- docker#v3.13.0:
Expand Down
24 changes: 24 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,27 @@
# dbt_fivetran_log v1.5.0
[PR #114](https://github.com/fivetran/dbt_fivetran_log/pull/114) includes the following updates:

## Breaking Changes
> ⚠️ Since the following changes are breaking, we recommend running a `--full-refresh` after upgrading to this version.
- For Bigquery and Databricks destinations, updated the `partition_by` config to coordinate with the filter used in the incremental logic.
- For Snowflake destinations, added a `cluster_by` config for performance.

## Feature Updates
- Updated incremental logic for `fivetran_platform__audit_table` so that it looks back 7 days to catch any late arriving records.
- Updated JSON parsing logic in the following models to prevent run failures when incoming JSON-like strings are invalid.
- `fivetran_platform__audit_table`
- `fivetran_platform__audit_user_activity`
- `fivetran_platform__connector_daily_events`
- `fivetran_platform__connector_status`
- `fivetran_platform__schema_changelog`
- Updated `fivetran_platform__connector_status` to parse only a subset of the `message_data` field to improve compute.

## Under The Hood
- Added macros:
- `fivetran_log_json_parse` to handle the updated JSON parsing.
- `fivetran_log_lookback` for use in `fivetran_platform__audit_table`.
- Updated seeds to test handling of invalid JSON strings.

# dbt_fivetran_log v1.4.3
[PR #112](https://github.com/fivetran/dbt_fivetran_log/pull/112) includes the following updates:

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Include the following Fivetran Platform package version range in your `packages.
```yaml
packages:
- package: fivetran/fivetran_log
version: [">=1.4.0", "<1.5.0"]
version: [">=1.5.0", "<1.6.0"]
```

> Note that altough the source connector is now "Fivetran Platform", the package retains the old name of "fivetran_log".
Expand Down
2 changes: 1 addition & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
config-version: 2
name: 'fivetran_log'
version: '1.4.3'
version: '1.5.0'
require-dbt-version: [">=1.3.0", "<2.0.0"]

models:
Expand Down
2 changes: 1 addition & 1 deletion docs/catalog.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/manifest.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/run_results.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: 'fivetran_log_integration_tests'
version: '1.4.3'
version: '1.5.0'

config-version: 2
profile: 'integration_tests'
Expand Down
1 change: 1 addition & 0 deletions integration_tests/seeds/connector.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
connector_id,_fivetran_synced,connecting_user_id,connector_name,connector_type,destination_id,paused,signed_up,_fivetran_deleted
this_connector,2021-03-12 21:03:45.994,,this_connector,s3,ups_trampoline,true,2020-08-31 19:00:22.913630,false
this_connector,2021-03-12 21:03:45.991,off_ence,this_connector,s3,television_trace,true,2019-03-22 18:49:12.833910,true
aft_gleeful,2023-05-18 15:57:06.879000,intimation_awkwardly,test,s3,obtained_willow,true,2020-05-18 18:13:43.563198,false
3 changes: 2 additions & 1 deletion integration_tests/seeds/destination.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
id,_fivetran_synced,account_id,created_at,name,region
ups_trampoline,2021-03-12 20:20:40.650,1k56c2c4xlti6_acct,2019-07-09 07:21:03.935607,s_test,us-test
television_trace,2021-03-12 20:20:40.624,1k56c2c4xlti6_acct,2019-07-10 08:07:48.374015,s_gcp,us-test
television_trace,2021-03-12 20:20:40.624,1k56c2c4xlti6_acct,2019-07-10 08:07:48.374015,s_gcp,us-test
obtained_willow,2023-05-18 15:55:02.025000,1k56c2c4xlti6_acc,2020-05-18 17:28:42.535908,dbt_package_testing_bigquery,us-test
5 changes: 3 additions & 2 deletions integration_tests/seeds/log.csv
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ H6AsShJjRoy1n75oVRxjQ08perA=,2021-12-09 14:26:29.814,2021-12-09 20:30:53.903,int
bCgsj8twLxBBhaiBN9N4S35grkI=,2021-12-09 14:26:29.744,2021-12-09 20:30:53.901,intrinsic_departed,INFO,"{""table"":""user_history""}",write_to_table_start,123,456abc
UcIZasAY/eulDQF2SQM6Y5W83gU=,2021-12-09 14:26:29.719,2021-12-09 20:30:53.878,intrinsic_departed,INFO,"{""table"":""media_insights""}",write_to_table_start,123,456abc
mm0QUb+ldnbSWknrBrF/v24YGvo=,2021-12-09 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,,sync_start,123,456abc
Z/kZAi7u6DzO7UrZeWK/ZsAkbno=,2021-12-10 14:27:00.504,2021-12-10 20:30:53.959,intrinsic_departed,INFO,,sync_end,123,456abc
Z/kZAi7u6DzO7UrZeWK/ZsAkbno=,2021-12-10 14:27:00.504,2021-12-10 20:30:53.959,intrinsic_departed,INFO,,sync_end,,456abc
+IX63qGrDFPaTy9/6epjpppk4JI=,2021-12-10 14:26:41.875,2021-12-10 20:30:53.948,intrinsic_departed,INFO,"{""table"":""media_insights""}",write_to_table_end,123,456abc
ZZLSXVYxirh8aKlgdCzo5fvd71g=,2021-12-10 14:26:42.029,2021-12-10 20:30:53.949,intrinsic_departed,INFO,"{""schema"":""instagram_business"",""count"":5,""operationType"":""REPLACED_OR_INSERTED"",""table"":""media_insights""}",records_modified,123,456abc
D7UqnKYn6OT04HkUcPNjXA95ttI=,2021-12-10 14:26:29.719,2021-12-10 20:30:53.878,intrinsic_departed,INFO,"{""table"":""media_insights""}",write_to_table_start,123,456abc
V3H62Gj5c6nB8bY+OdxMHvp/hNs=,2021-12-10 14:26:05.907,2021-12-10 20:30:53.778,intrinsic_departed,INFO,,sync_start,123,456abc
jkLxnPUfQ/9mHaWCmxUQEhul8ZA=,2023-11-09 11:31:31.579000,2023-11-09 15:55:12.560000,protestations_mourned,INFO,"{""actor"":""[email protected]"",""properties"":{""endpoint"":""************"",""apiKey"":""************"",""customEvents"":[],""syncMode"":""AllEvents"",""isE2ETest"":false,""events"":[],""customEventSyncMode"":""AllEvents""},""id"":""iterable""}",create_connector,,
jkLxnPUfQ/9mHaWCmxUQEhul8ZA=,2023-11-09 11:31:31.579000,2023-11-09 15:55:12.560000,protestations_mourned,INFO,"{""actor"":""[email protected]"",""properties"":{""endpoint"":""************"",""apiKey"":""************"",""customEvents"":[],""syncMode"":""AllEvents"",""isE2ETest"":false,""events"":[],""customEventSyncMode"":""AllEvents""},""id"":""iterable""}",create_connector,,
R7UqnKYn6OT04HkUcPNjXA95qqI=,2021-12-10 16:26:29.719,2021-12-10 20:30:53.878,intrinsic_departed,INFO,says actor but not a json,status,,456abc
42 changes: 42 additions & 0 deletions macros/fivetran_log_json_parse.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{% macro fivetran_log_json_parse(string, string_path) -%}

{{ adapter.dispatch('fivetran_log_json_parse', 'fivetran_log') (string, string_path) }}

{%- endmacro %}

{% macro default__fivetran_log_json_parse(string, string_path) %}

{{ fivetran_utils.json_parse(string=string, string_path=string_path) }}

{% endmacro %}

{% macro snowflake__fivetran_log_json_parse(string, string_path) %}

try_parse_json({{ string }}) {%- for s in string_path -%}{% if s is number %}[{{ s }}]{% else %}['{{ s }}']{% endif %}{%- endfor -%}

{% endmacro %}

{% macro redshift__fivetran_log_json_parse(string, string_path) %}

json_extract_path_text(
{{ string }},
{%- for s in string_path -%}'{{ s }}'{%- if not loop.last -%},{%- endif -%}{%- endfor -%},
true ) -- this flag sets null_if_invalid=true

{% endmacro %}

{% macro postgres__fivetran_log_json_parse(string, string_path) %}

case when {{ string }} ~ '^\s*[\{].*[\}]?\s*$' -- Postgres has no native json check, so this will check the string for indicators of a JSON object
then {{ string }}::json #>> '{ {%- for s in string_path -%}{{ s }}{%- if not loop.last -%},{%- endif -%}{%- endfor -%} }'
else null end

{% endmacro %}

{% macro sqlserver__fivetran_log_json_parse(string, string_path) %}

case when isjson({{ string }}) = 1 -- check if json string is valid
then json_value({{ string }}, '$.{%- for s in string_path -%}{{ s }}{%- if not loop.last -%}.{%- endif -%}{%- endfor -%} ')
else null end

{% endmacro %}
35 changes: 35 additions & 0 deletions macros/fivetran_log_lookback.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{% macro fivetran_log_lookback(from_date, datepart='day', interval=7, default_start_date='2010-01-01') %}

{{ adapter.dispatch('fivetran_log_lookback', 'fivetran_log') (from_date, datepart='day', interval=7, default_start_date='2010-01-01') }}

{%- endmacro %}

{% macro default__fivetran_log_lookback(from_date, datepart='day', interval=7, default_start_date='2010-01-01') %}

coalesce(
(select {{ dbt.dateadd(datepart=datepart, interval=-interval, from_date_or_timestamp=from_date) }}
from {{ this }}),
{{ "'" ~ default_start_date ~ "'" }}
)

{% endmacro %}

{% macro bigquery__fivetran_log_lookback(from_date, datepart='day', interval=7, default_start_date='2010-01-01') %}

-- Capture the latest timestamp in a call statement instead of a subquery for optimizing BQ costs on incremental runs
{%- call statement('date_agg', fetch_result=True) -%}
select {{ from_date }} from {{ this }}
{%- endcall -%}

-- load the result from the above query into a new variable
{%- set query_result = load_result('date_agg') -%}

-- the query_result is stored as a dataframe. Therefore, we want to now store it as a singular value.
{%- set date_agg = query_result['data'][0][0] %}

coalesce(
{{ dbt.dateadd(datepart='day', interval=-7, from_date_or_timestamp="'" ~ date_agg ~ "'") }},
{{ "'" ~ default_start_date ~ "'" }}
)

{% endmacro %}
35 changes: 11 additions & 24 deletions models/fivetran_platform__audit_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
materialized='incremental',
unique_key='unique_table_sync_key',
partition_by={
'field': 'sync_start',
'data_type': 'timestamp',
'granularity': 'day'
'field': 'sync_start_day',
'data_type': 'date'
} if target.type == 'bigquery' else ['sync_start_day'],
cluster_by = ['sync_start_day'],
incremental_strategy='insert_overwrite' if target.type in ('bigquery', 'spark', 'databricks') else 'delete+insert',
file_format='parquet'
) }}
Expand All @@ -14,30 +14,17 @@ with sync_log as (

select
*,
{{ fivetran_utils.json_parse(string='message_data', string_path=['table']) }} as table_name
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }} as table_name
from {{ ref('stg_fivetran_platform__log') }}
where event_subtype in ('sync_start', 'sync_end', 'write_to_table_start', 'write_to_table_end', 'records_modified')

{% if is_incremental() %}

-- Capture the latest timestamp in a call statement instead of a subquery for optimizing BQ costs on incremental runs
{%- call statement('max_sync_start', fetch_result=True) -%}
select cast(max(sync_start) as date) from {{ this }}
{%- endcall -%}

-- load the result from the above query into a new variable
{%- set query_result = load_result('max_sync_start') -%}

-- the query_result is stored as a dataframe. Therefore, we want to now store it as a singular value.
{%- set max_sync_start = query_result['data'][0][0] -%}

-- compare the new batch of data to the latest sync already stored in this model
and cast(created_at as date) > '{{ max_sync_start }}'
and cast(created_at as date) > {{ fivetran_log.fivetran_log_lookback(from_date='max(sync_start_day)', interval=7) }}

{% endif %}
),


connector as (

select *
Expand Down Expand Up @@ -93,10 +80,10 @@ records_modified_log as (
select
connector_id,
created_at,
{{ fivetran_utils.json_parse(string='message_data', string_path=['table']) }} as table_name,
{{ fivetran_utils.json_parse(string='message_data', string_path=['schema']) }} as schema_name,
{{ fivetran_utils.json_parse(string='message_data', string_path=['operationType']) }} as operation_type,
cast ({{ fivetran_utils.json_parse(string='message_data', string_path=['count']) }} as {{ dbt.type_int() }}) as row_count
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }} as table_name,
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['schema']) }} as schema_name,
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['operationType']) }} as operation_type,
cast ({{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['count']) }} as {{ dbt.type_int() }}) as row_count
from sync_log
where event_subtype = 'records_modified'

Expand Down Expand Up @@ -146,9 +133,9 @@ final as (
select
*,
{{ dbt_utils.generate_surrogate_key(['schema_name','connector_id', 'destination_id', 'table_name', 'write_to_table_start']) }} as unique_table_sync_key, -- for incremental materialization
{{ dbt.date_trunc('day', 'sync_start') }} as sync_start_day -- for partitioning in databricks
cast({{ dbt.date_trunc('day', 'sync_start') }} as date) as sync_start_day -- for partitioning
from sum_records_modified
)

select *
from final
from final
2 changes: 1 addition & 1 deletion models/fivetran_platform__audit_user_activity.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ with logs as (

select
*,
{{ fivetran_utils.json_parse(string='message_data', string_path=['actor']) }} as actor_email
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path='actor') }} as actor_email
from {{ ref('stg_fivetran_platform__log') }}
where lower(message_data) like '%actor%'
),
Expand Down
2 changes: 1 addition & 1 deletion models/fivetran_platform__connector_daily_events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ log_events as (
when event_subtype in ('create_table', 'alter_table', 'create_schema', 'change_schema_config') then 'schema_change'
else event_subtype end as event_subtype,

sum(case when event_subtype = 'records_modified' then cast( {{ fivetran_utils.json_parse(string='message_data', string_path=['count']) }} as {{ dbt.type_int()}} )
sum(case when event_subtype = 'records_modified' then cast( {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['count']) }} as {{ dbt.type_int()}} )
else 1 end) as count_events

from {{ ref('stg_fivetran_platform__log') }}
Expand Down
36 changes: 23 additions & 13 deletions models/fivetran_platform__connector_status.sql
Original file line number Diff line number Diff line change
@@ -1,28 +1,39 @@
with transformation_removal as (

select *
select
*,
case when event_subtype in ('status', 'sync_end')
then message_data
else null
end as filtered_message_data
from {{ ref('stg_fivetran_platform__log') }}
where transformation_id is null
),

parse_json as (
select
*,
{{ fivetran_log.fivetran_log_json_parse(string="filtered_message_data", string_path=["status"]) }} as log_status,
{{ fivetran_log.fivetran_log_json_parse(string="filtered_message_data", string_path=["reason"]) }} as log_reason
from transformation_removal
),

connector_log as (
select
*,
sum( case when event_subtype in ('sync_start') then 1 else 0 end) over ( partition by connector_id
order by created_at rows unbounded preceding) as sync_batch_id
from transformation_removal
from parse_json
-- only looking at errors, warnings, and syncs here
where event_type = 'SEVERE'
or event_type = 'WARNING'
or event_subtype like 'sync%'
or (event_subtype = 'status'
and {{ fivetran_utils.json_parse(string="message_data", string_path=["status"]) }} = 'RESCHEDULED'

and {{ fivetran_utils.json_parse(string="message_data", string_path=["reason"]) }} like '%intended behavior%'
and log_status = 'RESCHEDULED'
and log_reason like '%intended behavior%'
) -- for priority-first syncs. these should be captured by event_type = 'WARNING' but let's make sure
or (event_subtype = 'status'
and {{ fivetran_utils.json_parse(string="message_data", string_path=["status"]) }} = 'SUCCESSFUL'
and log_status = 'SUCCESSFUL'
)
-- whole reason is "We have rescheduled the connector to force flush data from the forward sync into your destination. This is intended behavior and means that the connector is working as expected."
),
Expand Down Expand Up @@ -72,16 +83,16 @@ connector_metrics as (
then connector_log.created_at else null end) as last_sync_completed_at,

max(case when connector_log.event_subtype in ('status', 'sync_end')
and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["status"]) }} ='SUCCESSFUL'
and connector_log.log_status = 'SUCCESSFUL'
then connector_log.created_at else null end) as last_successful_sync_completed_at,


max(case when connector_log.event_subtype = 'sync_end'
then connector_log.sync_batch_id else null end) as last_sync_batch_id,

max(case when connector_log.event_subtype in ('status', 'sync_end')
and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["status"]) }} ='RESCHEDULED'
and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["reason"]) }} like '%intended behavior%'
and connector_log.log_status = 'RESCHEDULED'
and connector_log.log_reason like '%intended behavior%'
then connector_log.created_at else null end) as last_priority_first_sync_completed_at,


Expand Down Expand Up @@ -156,8 +167,8 @@ connector_recent_logs as (
and connector_log.event_type != 'INFO'
-- need to explicitly avoid priority first statuses because they are of event_type WARNING
and not (connector_log.event_subtype = 'status'
and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["status"]) }} ='RESCHEDULED'
and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["reason"]) }} like '%intended behavior%')
and connector_log.log_status = 'RESCHEDULED'
and connector_log.log_reason like '%intended behavior%')

group by -- remove duplicates, need explicit group by for SQL Server
connector_health_status.connector_id,
Expand All @@ -172,7 +183,6 @@ connector_recent_logs as (
connector_log.event_subtype,
connector_log.event_type,
connector_log.message_data

),

final as (
Expand Down Expand Up @@ -213,7 +223,7 @@ final as (
connector_recent_logs.last_sync_started_at,
connector_recent_logs.last_sync_completed_at,
connector_recent_logs.set_up_at,
number_of_schema_changes_last_month
schema_changes.number_of_schema_changes_last_month
)

select * from final
6 changes: 3 additions & 3 deletions models/fivetran_platform__schema_changelog.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ final as (
message_data,

case
when event_subtype = 'alter_table' then {{ fivetran_utils.json_parse(string='message_data', string_path=['table']) }}
when event_subtype = 'create_table' then {{ fivetran_utils.json_parse(string='message_data', string_path=['name']) }}
when event_subtype = 'alter_table' then {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }}
when event_subtype = 'create_table' then {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['name']) }}
else null end as table_name,

case
when event_subtype = 'create_schema' or event_subtype = 'create_table' then {{ fivetran_utils.json_parse(string='message_data', string_path=['schema']) }}
when event_subtype = 'create_schema' or event_subtype = 'create_table' then {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['schema']) }}
else null end as schema_name

from add_connector_info
Expand Down

0 comments on commit 1627958

Please sign in to comment.