Skip to content

Commit

Permalink
Merge pull request #41 from fivetran/bugfix/connector-status
Browse files Browse the repository at this point in the history
Bugfix/connector status
  • Loading branch information
fivetran-joemarkiewicz authored Mar 23, 2022
2 parents 8ed0c8a + b6adedc commit 65ac6bf
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 16 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# dbt_fivetran_log v0.5.3
## Fixes
- Per the [Fivetran Log December 2021 Release Notes](https://fivetran.com/docs/logs/changelog#december2021) every sync results in a final `sync_end` event. In the previous version of this package, a successful sync was identified via a `sync_end` event while anything else was a version of broken. Since all syncs result in a `sync_end` event now, the package has been updated to account for this change within the connector.
- To account for the above fix, a new field (`last_successful_sync_completed_at`) was added to the `fivetran_log__connector_status` model. This field captures the last successful sync for the connector.
# dbt_fivetran_log v0.5.2
## Fixes
- The `fivetran_log__connector_status` model uses a date function off of `created_at` from the `stg_fivetran_log__log` model. This fails on certain redshift destinations as the timestamp is synced as `timestamptz`. Therefore, the field within the staging model is cast using `dbt_utils.type_timestamp` to appropriately cast the field for downstream functions. Further, to future proof, timestamps were cast within the following staging models: `account`, `account_membership`, `active_volume`, `destination_membership`, `destination`, `log`, `transformation`, and `user`. ([#40](https://github.com/fivetran/dbt_fivetran_log/pull/40))
Expand Down
2 changes: 1 addition & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
config-version: 2

name: 'fivetran_log'
version: '0.5.2'
version: '0.5.3'

require-dbt-version: [">=1.0.0", "<2.0.0"]

Expand Down
2 changes: 1 addition & 1 deletion docs/catalog.json

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions docs/index.html

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/manifest.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: 'fivetran_log_integration_tests'
version: '0.5.2'
version: '0.5.3'
config-version: 2
profile: 'integration_tests'

Expand Down
6 changes: 5 additions & 1 deletion models/fivetran_log.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,18 @@ models:
- name: connector_health
description: Status of the connector's data flow. Can be `"broken"`, `"incomplete"`, `"connected"`, `"paused"`, `"initial sync in progress"`, or `"priority first sync"`.

- name: last_successful_sync_completed_at
description: >
Timestamp of when the last sync attempt was completed. This could be either a failure or a successful sync.
- name: last_sync_started_at
description: >
Timestamp of when the last sync was started. Note that if new data comes in after a sync has
begun, the new data will not be included in this sync.
- name: last_sync_completed_at
description: >
Timestamp of when the last sync was complted.
Timestamp of when the last sync was completed.
- name: set_up_at
description: Timestamp of when the connector was set up.
Expand Down
33 changes: 25 additions & 8 deletions models/fivetran_log__connector_status.sql
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
with connector_log as (

select *
select *,
sum( case when event_subtype in ('sync_start') then 1 else 0 end) over ( partition by connector_id
order by created_at rows unbounded preceding) as sync_batch_id
from {{ ref('stg_fivetran_log__log') }}

-- only looking at errors, warnings, and syncs here
where event_type = 'SEVERE'
or event_type = 'WARNING'
or event_subtype like 'sync%'
or (event_subtype = 'status'
and {{ fivetran_utils.json_parse(string="message_data", string_path=["status"]) }} ='RESCHEDULED'
and {{ fivetran_utils.json_parse(string="message_data", string_path=["status"]) }} = 'RESCHEDULED'

and {{ fivetran_utils.json_parse(string="message_data", string_path=["reason"]) }} like '%intended behavior%'
) -- for priority-first syncs. these should be captured by event_type = 'WARNING' but let's make sure

or (event_subtype = 'status'
and {{ fivetran_utils.json_parse(string="message_data", string_path=["status"]) }} = 'SUCCESSFUL'
)
-- whole reason is "We have rescheduled the connector to force flush data from the forward sync into your destination. This is intended behavior and means that the connector is working as expected."
),

Expand Down Expand Up @@ -58,13 +63,23 @@ connector_metrics as (
max(case when connector_log.event_subtype = 'sync_end'
then connector_log.created_at else null end) as last_sync_completed_at,

max(case when connector_log.event_subtype = 'status'
max(case when connector_log.event_subtype in ('status', 'sync_end')
and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["status"]) }} ='SUCCESSFUL'
then connector_log.created_at else null end) as last_successful_sync_completed_at,


max(case when connector_log.event_subtype = 'sync_end'
then connector_log.sync_batch_id else null end) as last_sync_batch_id,

max(case when connector_log.event_subtype in ('status', 'sync_end')
and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["status"]) }} ='RESCHEDULED'
and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["reason"]) }} like '%intended behavior%'
then connector_log.created_at else null end) as last_priority_first_sync_completed_at,


max(case when connector_log.event_type = 'SEVERE' then connector_log.created_at else null end) as last_error_at,

max(case when connector_log.event_type = 'SEVERE' then connector_log.sync_batch_id else null end) as last_error_batch,
max(case when event_type = 'WARNING' then connector_log.created_at else null end) as last_warning_at

from connector
Expand Down Expand Up @@ -94,8 +109,8 @@ connector_health as (
-- a sync has been attempted, but not completed, and it's not due to errors. also a priority-first sync hasn't
when last_sync_completed_at is null and last_error_at is null then 'initial sync in progress'

-- there's been an error since the connector last completed a sync
when last_error_at > last_sync_completed_at then 'broken'
-- the last attempted sync had an error
when last_sync_batch_id = last_error_batch then 'broken'

-- there's never been a successful sync and there have been errors
when last_sync_completed_at is null and last_error_at is not null then 'broken'
Expand All @@ -114,6 +129,7 @@ connector_recent_logs as (
connector_health.connector_type,
connector_health.destination_id,
connector_health.connector_health,
connector_health.last_successful_sync_completed_at,
connector_health.last_sync_started_at,
connector_health.last_sync_completed_at,
connector_health.set_up_at,
Expand All @@ -133,7 +149,7 @@ connector_recent_logs as (
and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["status"]) }} ='RESCHEDULED'
and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["reason"]) }} like '%intended behavior%')

{{ dbt_utils.group_by(n=11) }} -- de-duping error messages
{{ dbt_utils.group_by(n=12) }} -- de-duping error messages


),
Expand All @@ -147,6 +163,7 @@ final as (
connector_recent_logs.destination_id,
destination.destination_name,
connector_recent_logs.connector_health,
connector_recent_logs.last_successful_sync_completed_at,
connector_recent_logs.last_sync_started_at,
connector_recent_logs.last_sync_completed_at,
connector_recent_logs.set_up_at,
Expand All @@ -162,7 +179,7 @@ final as (
on connector_recent_logs.connector_id = schema_changes.connector_id

join destination on destination.destination_id = connector_recent_logs.destination_id
{{ dbt_utils.group_by(n=10) }}
{{ dbt_utils.group_by(n=11) }}
)

select * from final

0 comments on commit 65ac6bf

Please sign in to comment.