Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug/redshift json parse #114

Merged
merged 47 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
d7c0319
bug/redshift-json-parse
fivetran-catfritz Feb 10, 2024
3fd8796
bug/redshift-json-parse
fivetran-catfritz Feb 10, 2024
548f017
update
fivetran-catfritz Feb 11, 2024
4ec7578
update
fivetran-catfritz Feb 12, 2024
d489457
updates
fivetran-catfritz Feb 12, 2024
85689b7
update macro
fivetran-catfritz Feb 12, 2024
1709d45
update macro
fivetran-catfritz Feb 12, 2024
480c346
update macro
fivetran-catfritz Feb 12, 2024
77978d3
update seed
fivetran-catfritz Feb 12, 2024
5a80eac
update
fivetran-catfritz Feb 12, 2024
9ffff41
updates
fivetran-catfritz Feb 12, 2024
1d8b875
updates
fivetran-catfritz Feb 12, 2024
c76acf1
updates
fivetran-catfritz Feb 12, 2024
b05ec50
updates
fivetran-catfritz Feb 12, 2024
32b9428
updates
fivetran-catfritz Feb 13, 2024
193b665
updates
fivetran-catfritz Feb 13, 2024
1c6a764
updates
fivetran-catfritz Feb 13, 2024
dfc7ad5
update incremental
fivetran-catfritz Feb 13, 2024
2ec773f
add lookback
fivetran-catfritz Feb 13, 2024
be958da
add lookback
fivetran-catfritz Feb 13, 2024
6d2d1e1
add lookback
fivetran-catfritz Feb 13, 2024
1aaeb0f
update current time
fivetran-catfritz Feb 13, 2024
bcbe017
updates
fivetran-catfritz Feb 13, 2024
075b720
updates
fivetran-catfritz Feb 13, 2024
5a0e356
updates
fivetran-catfritz Feb 13, 2024
5384a99
json updates
fivetran-catfritz Feb 14, 2024
c90d51d
json updates
fivetran-catfritz Feb 14, 2024
24cc4fc
incremental updates
fivetran-catfritz Feb 15, 2024
205ce6a
incremental updates
fivetran-catfritz Feb 15, 2024
b5ad8e9
updates && regen docs
fivetran-catfritz Feb 16, 2024
5aa991a
updates yml
fivetran-catfritz Feb 16, 2024
a6b7406
update changelog
fivetran-catfritz Feb 16, 2024
ac435aa
add bq logic
fivetran-catfritz Feb 19, 2024
d4d85fd
add bq logic
fivetran-catfritz Feb 19, 2024
01f0612
add bq logic
fivetran-catfritz Feb 19, 2024
dae512d
Apply suggestions from code review
fivetran-catfritz Feb 19, 2024
dd75c37
Apply suggestions from code review
fivetran-catfritz Feb 19, 2024
2618f01
update comment
fivetran-catfritz Feb 19, 2024
41e0792
update
fivetran-catfritz Feb 19, 2024
6a69d61
update changelog
fivetran-catfritz Feb 19, 2024
f50c281
update changelog
fivetran-catfritz Feb 19, 2024
3dfecb7
update changelog
fivetran-catfritz Feb 19, 2024
675f56a
update changelog
fivetran-catfritz Feb 19, 2024
ea57b23
Update CHANGELOG.md
fivetran-catfritz Feb 20, 2024
d26da8a
regen docs
fivetran-catfritz Feb 20, 2024
0fca474
update changelog
fivetran-catfritz Feb 20, 2024
56b1214
update changelog
fivetran-catfritz Feb 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ steps:
commands: |
bash .buildkite/scripts/run_models.sh redshift

- label: ":bricks: Run Tests - Databricks"
- label: ":databricks: Run Tests - Databricks"
key: "run_dbt_databricks"
plugins:
- docker#v3.13.0:
Expand Down
19 changes: 19 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
# dbt_fivetran_log v1.5.0
[PR #114](https://github.com/fivetran/dbt_fivetran_log/pull/114) includes the following updates:

## Breaking Changes
- The following changes are marked as a breaking change out of caution, as a full refresh may be required if you are experiencing issues after the update.
fivetran-joemarkiewicz marked this conversation as resolved.
Show resolved Hide resolved
- For Bigquery and Databricks users, updated the `partition_by` config to coordinate with the filter used in the incremental logic.
- For Snowflake users, added a `cluster_by` config for performance.
fivetran-catfritz marked this conversation as resolved.
Show resolved Hide resolved

## Feature Updates
- Updated incremental logic for `fivetran_platform__audit_table` so that it looks back 7 days to catch any late arriving records.
- Updated json parsing logic to prevent run failures when incoming json-like strings are invalid.
fivetran-joemarkiewicz marked this conversation as resolved.
Show resolved Hide resolved
- Added filter to `fivetran_platform__connector_status` so only necessary log records will be parsed.

## Under The Hood
- Added macros:
- `fivetran_log_json_parse` to handle the updated json parsing.
- `fivetran_log_lookback` for use in `fivetran_platform__audit_table`.
- Updated testing of invalid json strings.

# dbt_fivetran_log v1.4.3
[PR #112](https://github.com/fivetran/dbt_fivetran_log/pull/112) includes the following updates:

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ Include the following Fivetran Platform package version range in your `packages.
```yaml
packages:
- package: fivetran/fivetran_log
version: [">=1.4.0", "<1.5.0"]
version: [">=1.5.0", "<1.6.0"]
```

> Note that altough the source connector is now "Fivetran Platform", the package retains the old name of "fivetran_log".
Expand Down
2 changes: 1 addition & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
config-version: 2
name: 'fivetran_log'
version: '1.4.3'
version: '1.5.0'
require-dbt-version: [">=1.3.0", "<2.0.0"]

models:
Expand Down
2 changes: 1 addition & 1 deletion docs/catalog.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/manifest.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/run_results.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: 'fivetran_log_integration_tests'
version: '1.4.3'
version: '1.5.0'

config-version: 2
profile: 'integration_tests'
Expand Down
1 change: 1 addition & 0 deletions integration_tests/seeds/connector.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
connector_id,_fivetran_synced,connecting_user_id,connector_name,connector_type,destination_id,paused,signed_up,_fivetran_deleted
this_connector,2021-03-12 21:03:45.994,,this_connector,s3,ups_trampoline,true,2020-08-31 19:00:22.913630,false
this_connector,2021-03-12 21:03:45.991,off_ence,this_connector,s3,television_trace,true,2019-03-22 18:49:12.833910,true
aft_gleeful,2023-05-18 15:57:06.879000,intimation_awkwardly,test,s3,obtained_willow,true,2020-05-18 18:13:43.563198,false
3 changes: 2 additions & 1 deletion integration_tests/seeds/destination.csv
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
id,_fivetran_synced,account_id,created_at,name,region
ups_trampoline,2021-03-12 20:20:40.650,1k56c2c4xlti6_acct,2019-07-09 07:21:03.935607,s_test,us-test
television_trace,2021-03-12 20:20:40.624,1k56c2c4xlti6_acct,2019-07-10 08:07:48.374015,s_gcp,us-test
television_trace,2021-03-12 20:20:40.624,1k56c2c4xlti6_acct,2019-07-10 08:07:48.374015,s_gcp,us-test
obtained_willow,2023-05-18 15:55:02.025000,1k56c2c4xlti6_acc,2020-05-18 17:28:42.535908,dbt_package_testing_bigquery,us-test
5 changes: 3 additions & 2 deletions integration_tests/seeds/log.csv
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ H6AsShJjRoy1n75oVRxjQ08perA=,2021-12-09 14:26:29.814,2021-12-09 20:30:53.903,int
bCgsj8twLxBBhaiBN9N4S35grkI=,2021-12-09 14:26:29.744,2021-12-09 20:30:53.901,intrinsic_departed,INFO,"{""table"":""user_history""}",write_to_table_start,123,456abc
UcIZasAY/eulDQF2SQM6Y5W83gU=,2021-12-09 14:26:29.719,2021-12-09 20:30:53.878,intrinsic_departed,INFO,"{""table"":""media_insights""}",write_to_table_start,123,456abc
mm0QUb+ldnbSWknrBrF/v24YGvo=,2021-12-09 14:26:05.907,2021-12-09 20:30:53.778,intrinsic_departed,INFO,,sync_start,123,456abc
Z/kZAi7u6DzO7UrZeWK/ZsAkbno=,2021-12-10 14:27:00.504,2021-12-10 20:30:53.959,intrinsic_departed,INFO,,sync_end,123,456abc
Z/kZAi7u6DzO7UrZeWK/ZsAkbno=,2021-12-10 14:27:00.504,2021-12-10 20:30:53.959,intrinsic_departed,INFO,,sync_end,,456abc
+IX63qGrDFPaTy9/6epjpppk4JI=,2021-12-10 14:26:41.875,2021-12-10 20:30:53.948,intrinsic_departed,INFO,"{""table"":""media_insights""}",write_to_table_end,123,456abc
ZZLSXVYxirh8aKlgdCzo5fvd71g=,2021-12-10 14:26:42.029,2021-12-10 20:30:53.949,intrinsic_departed,INFO,"{""schema"":""instagram_business"",""count"":5,""operationType"":""REPLACED_OR_INSERTED"",""table"":""media_insights""}",records_modified,123,456abc
D7UqnKYn6OT04HkUcPNjXA95ttI=,2021-12-10 14:26:29.719,2021-12-10 20:30:53.878,intrinsic_departed,INFO,"{""table"":""media_insights""}",write_to_table_start,123,456abc
V3H62Gj5c6nB8bY+OdxMHvp/hNs=,2021-12-10 14:26:05.907,2021-12-10 20:30:53.778,intrinsic_departed,INFO,,sync_start,123,456abc
jkLxnPUfQ/9mHaWCmxUQEhul8ZA=,2023-11-09 11:31:31.579000,2023-11-09 15:55:12.560000,protestations_mourned,INFO,"{""actor"":""[email protected]"",""properties"":{""endpoint"":""************"",""apiKey"":""************"",""customEvents"":[],""syncMode"":""AllEvents"",""isE2ETest"":false,""events"":[],""customEventSyncMode"":""AllEvents""},""id"":""iterable""}",create_connector,,
jkLxnPUfQ/9mHaWCmxUQEhul8ZA=,2023-11-09 11:31:31.579000,2023-11-09 15:55:12.560000,protestations_mourned,INFO,"{""actor"":""[email protected]"",""properties"":{""endpoint"":""************"",""apiKey"":""************"",""customEvents"":[],""syncMode"":""AllEvents"",""isE2ETest"":false,""events"":[],""customEventSyncMode"":""AllEvents""},""id"":""iterable""}",create_connector,,
R7UqnKYn6OT04HkUcPNjXA95qqI=,2021-12-10 16:26:29.719,2021-12-10 20:30:53.878,intrinsic_departed,INFO,says actor but not a json,status,,456abc
42 changes: 42 additions & 0 deletions macros/fivetran_log_json_parse.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{% macro fivetran_log_json_parse(string, string_path) -%}

{{ adapter.dispatch('fivetran_log_json_parse', 'fivetran_log') (string, string_path) }}

{%- endmacro %}

{% macro default__fivetran_log_json_parse(string, string_path) %}

{{ fivetran_utils.json_parse(string=string, string_path=string_path) }}

{% endmacro %}

{% macro snowflake__fivetran_log_json_parse(string, string_path) %}

try_parse_json({{ string }}) {%- for s in string_path -%}{% if s is number %}[{{ s }}]{% else %}['{{ s }}']{% endif %}{%- endfor -%}

{% endmacro %}

{% macro redshift__fivetran_log_json_parse(string, string_path) %}

json_extract_path_text(
{{ string }},
{%- for s in string_path -%}'{{ s }}'{%- if not loop.last -%},{%- endif -%}{%- endfor -%},
true ) -- this flag sets null_if_invalid=true

{% endmacro %}

{% macro postgres__fivetran_log_json_parse(string, string_path) %}

case when {{ string }} ~ '^\s*[\{].*[\}]?\s*$' -- Postgres has no native json check, so this will check the string for indicators of a JSON array or JSON object
then {{ string }}::json #>> '{ {%- for s in string_path -%}{{ s }}{%- if not loop.last -%},{%- endif -%}{%- endfor -%} }'
else null end

{% endmacro %}

{% macro sqlserver__fivetran_log_json_parse(string, string_path) %}

case when isjson({{ string }}) = 1 -- check if json string is valid
then json_value({{ string }}, '$.{%- for s in string_path -%}{{ s }}{%- if not loop.last -%}.{%- endif -%}{%- endfor -%} ')
else null end

{% endmacro %}
15 changes: 15 additions & 0 deletions macros/fivetran_log_lookback.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{% macro fivetran_log_lookback(from_date, datepart='day', interval=7, default_start_date='2010-01-01') %}

{{ adapter.dispatch('fivetran_log_lookback', 'fivetran_log') (from_date, datepart='day', interval=7, default_start_date='2010-01-01') }}

{%- endmacro %}

{% macro default__fivetran_log_lookback(from_date, datepart='day', interval=7, default_start_date='2010-01-01') %}

coalesce(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be a strange question, but what scenario would we need a default_start_date? Shouldn't there always by a max(date) on incremental runs? What scenario would there be where we need the coalesce to select the default start date?

Copy link
Contributor Author

@fivetran-catfritz fivetran-catfritz Feb 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I trialed removing this and it seems to compile and run fine. However for now I will leave this pending discussion on Tuesday.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I just wanted to make sure there likely wouldn't be a scenario where this field would actually be null. I don't believe including it will hurt so I am comfortable leaving it in.

(select {{ dbt.dateadd(datepart=datepart, interval=-interval, from_date_or_timestamp=from_date) }}
from {{ this }}),
{{ "'" ~ default_start_date ~ "'" }}
)

{% endmacro %}
fivetran-joemarkiewicz marked this conversation as resolved.
Show resolved Hide resolved
25 changes: 6 additions & 19 deletions models/fivetran_platform__audit_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
materialized='incremental',
unique_key='unique_table_sync_key',
partition_by={
'field': 'sync_start',
'data_type': 'timestamp',
'granularity': 'day'
'field': 'sync_start_day',
'data_type': 'date'
fivetran-joemarkiewicz marked this conversation as resolved.
Show resolved Hide resolved
} if target.type == 'bigquery' else ['sync_start_day'],
cluster_by = ['sync_start_day'],
incremental_strategy='insert_overwrite' if target.type in ('bigquery', 'spark', 'databricks') else 'delete+insert',
file_format='parquet'
) }}
Expand All @@ -20,24 +20,11 @@ with sync_log as (

{% if is_incremental() %}

-- Capture the latest timestamp in a call statement instead of a subquery for optimizing BQ costs on incremental runs
{%- call statement('max_sync_start', fetch_result=True) -%}
select cast(max(sync_start) as date) from {{ this }}
{%- endcall -%}

-- load the result from the above query into a new variable
{%- set query_result = load_result('max_sync_start') -%}

-- the query_result is stored as a dataframe. Therefore, we want to now store it as a singular value.
{%- set max_sync_start = query_result['data'][0][0] -%}

-- compare the new batch of data to the latest sync already stored in this model
and cast(created_at as date) > '{{ max_sync_start }}'
and cast(created_at as date) > {{ fivetran_log.fivetran_log_lookback(from_date='max(sync_start_day)', interval=7) }}

{% endif %}
),


connector as (

select *
Expand Down Expand Up @@ -146,9 +133,9 @@ final as (
select
*,
{{ dbt_utils.generate_surrogate_key(['schema_name','connector_id', 'destination_id', 'table_name', 'write_to_table_start']) }} as unique_table_sync_key, -- for incremental materialization
{{ dbt.date_trunc('day', 'sync_start') }} as sync_start_day -- for partitioning in databricks
cast({{ dbt.date_trunc('day', 'sync_start') }} as date) as sync_start_day -- for partitioning
from sum_records_modified
)

select *
from final
from final
2 changes: 1 addition & 1 deletion models/fivetran_platform__audit_user_activity.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ with logs as (

select
*,
{{ fivetran_utils.json_parse(string='message_data', string_path=['actor']) }} as actor_email
{{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path='actor') }} as actor_email
from {{ ref('stg_fivetran_platform__log') }}
where lower(message_data) like '%actor%'
),
Expand Down
2 changes: 1 addition & 1 deletion models/fivetran_platform__connector_daily_events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ log_events as (
when event_subtype in ('create_table', 'alter_table', 'create_schema', 'change_schema_config') then 'schema_change'
else event_subtype end as event_subtype,

sum(case when event_subtype = 'records_modified' then cast( {{ fivetran_utils.json_parse(string='message_data', string_path=['count']) }} as {{ dbt.type_int()}} )
sum(case when event_subtype = 'records_modified' then cast( {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['count']) }} as {{ dbt.type_int()}} )
else 1 end) as count_events

from {{ ref('stg_fivetran_platform__log') }}
Expand Down
35 changes: 22 additions & 13 deletions models/fivetran_platform__connector_status.sql
Original file line number Diff line number Diff line change
@@ -1,28 +1,38 @@
with transformation_removal as (

select *
select
*,
case when event_subtype in ('status', 'sync_end')
then message_data
else null end as filtered_message_data
fivetran-catfritz marked this conversation as resolved.
Show resolved Hide resolved
from {{ ref('stg_fivetran_platform__log') }}
where transformation_id is null
),

parse_json as (
select
*,
{{ fivetran_log.fivetran_log_json_parse(string="filtered_message_data", string_path=["status"]) }} as log_status,
{{ fivetran_log.fivetran_log_json_parse(string="filtered_message_data", string_path=["reason"]) }} as log_reason
from transformation_removal
fivetran-joemarkiewicz marked this conversation as resolved.
Show resolved Hide resolved
),

connector_log as (
select
*,
sum( case when event_subtype in ('sync_start') then 1 else 0 end) over ( partition by connector_id
order by created_at rows unbounded preceding) as sync_batch_id
from transformation_removal
from parse_json
-- only looking at errors, warnings, and syncs here
where event_type = 'SEVERE'
or event_type = 'WARNING'
or event_subtype like 'sync%'
or (event_subtype = 'status'
and {{ fivetran_utils.json_parse(string="message_data", string_path=["status"]) }} = 'RESCHEDULED'

and {{ fivetran_utils.json_parse(string="message_data", string_path=["reason"]) }} like '%intended behavior%'
and log_status = 'RESCHEDULED'
and log_reason like '%intended behavior%'
) -- for priority-first syncs. these should be captured by event_type = 'WARNING' but let's make sure
or (event_subtype = 'status'
and {{ fivetran_utils.json_parse(string="message_data", string_path=["status"]) }} = 'SUCCESSFUL'
and log_status = 'SUCCESSFUL'
)
-- whole reason is "We have rescheduled the connector to force flush data from the forward sync into your destination. This is intended behavior and means that the connector is working as expected."
),
Expand Down Expand Up @@ -72,16 +82,16 @@ connector_metrics as (
then connector_log.created_at else null end) as last_sync_completed_at,

max(case when connector_log.event_subtype in ('status', 'sync_end')
and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["status"]) }} ='SUCCESSFUL'
and connector_log.log_status = 'SUCCESSFUL'
then connector_log.created_at else null end) as last_successful_sync_completed_at,


max(case when connector_log.event_subtype = 'sync_end'
then connector_log.sync_batch_id else null end) as last_sync_batch_id,

max(case when connector_log.event_subtype in ('status', 'sync_end')
and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["status"]) }} ='RESCHEDULED'
and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["reason"]) }} like '%intended behavior%'
and connector_log.log_status = 'RESCHEDULED'
and connector_log.log_reason like '%intended behavior%'
then connector_log.created_at else null end) as last_priority_first_sync_completed_at,


Expand Down Expand Up @@ -156,8 +166,8 @@ connector_recent_logs as (
and connector_log.event_type != 'INFO'
-- need to explicitly avoid priority first statuses because they are of event_type WARNING
and not (connector_log.event_subtype = 'status'
and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["status"]) }} ='RESCHEDULED'
and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["reason"]) }} like '%intended behavior%')
and connector_log.log_status = 'RESCHEDULED'
and connector_log.log_reason like '%intended behavior%')

group by -- remove duplicates, need explicit group by for SQL Server
connector_health_status.connector_id,
Expand All @@ -172,7 +182,6 @@ connector_recent_logs as (
connector_log.event_subtype,
connector_log.event_type,
connector_log.message_data

),

final as (
Expand Down Expand Up @@ -213,7 +222,7 @@ final as (
connector_recent_logs.last_sync_started_at,
connector_recent_logs.last_sync_completed_at,
connector_recent_logs.set_up_at,
number_of_schema_changes_last_month
schema_changes.number_of_schema_changes_last_month
)

select * from final
6 changes: 3 additions & 3 deletions models/fivetran_platform__schema_changelog.sql
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ final as (
message_data,

case
when event_subtype = 'alter_table' then {{ fivetran_utils.json_parse(string='message_data', string_path=['table']) }}
when event_subtype = 'create_table' then {{ fivetran_utils.json_parse(string='message_data', string_path=['name']) }}
when event_subtype = 'alter_table' then {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['table']) }}
when event_subtype = 'create_table' then {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['name']) }}
else null end as table_name,

case
when event_subtype = 'create_schema' or event_subtype = 'create_table' then {{ fivetran_utils.json_parse(string='message_data', string_path=['schema']) }}
when event_subtype = 'create_schema' or event_subtype = 'create_table' then {{ fivetran_log.fivetran_log_json_parse(string='message_data', string_path=['schema']) }}
else null end as schema_name

from add_connector_info
Expand Down