Skip to content

Commit

Permalink
Merge pull request #36 from fivetran/feature/databricks
Browse files Browse the repository at this point in the history
Feature/databricks
  • Loading branch information
fivetran-jamie authored Feb 24, 2022
2 parents 0b06213 + 2e775a6 commit 691b82a
Show file tree
Hide file tree
Showing 12 changed files with 68 additions and 24 deletions.
11 changes: 11 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,17 @@ jobs:
pip install -r integration_tests/requirements.txt
mkdir -p ~/.dbt
cp integration_tests/ci/sample.profiles.yml ~/.dbt/profiles.yml
- run:
name: "Run Tests - Spark"
command: |
. venv/bin/activate
echo `pwd`
cd integration_tests
dbt deps
dbt seed --target spark --full-refresh
dbt run --target spark --full-refresh
dbt run --target spark
dbt test --target spark
- run:
name: "Run Tests - Redshift"
command: |
Expand Down
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# dbt_fivetran_log v0.5.1

## Features
This release just introduces Databricks compatibility! 🧱🧱

# dbt_fivetran_log v0.5.0
🎉 Official dbt v1.0.0 Compatibility Release 🎉
## 🚨 Breaking Changes 🚨
Expand Down
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,17 @@ and running the package? If so, we highly encourage and welcome contributions to
Please create issues or open PRs against `main`. See [the Discourse post](https://discourse.getdbt.com/t/contributing-to-a-dbt-package/657) for information on how to contribute to a package.

## Database Support
This package has been tested on BigQuery, Snowflake, Redshift, and Postgres.
This package has been tested on BigQuery, Snowflake, Redshift, Postgres, and Databricks.

### Databricks Dispatch Configuration
dbt `v0.20.0` introduced a new project-level dispatch configuration that enables an "override" setting for all dispatched macros. If you are using a Databricks destination with this package you will need to add the below (or a variation of the below) dispatch configuration within your `dbt_project.yml`. This is required in order for the package to accurately search for macros within the `dbt-labs/spark_utils` then the `dbt-labs/dbt_utils` packages respectively.
```yml
# dbt_project.yml
dispatch:
- macro_namespace: dbt_utils
search_order: ['spark_utils', 'dbt_utils']
```

## Resources:
- Provide [feedback](https://www.surveymonkey.com/r/DQ7K7WW) on our existing dbt packages or what you'd like to see next
Expand Down
2 changes: 1 addition & 1 deletion dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
config-version: 2

name: 'fivetran_log'
version: '0.5.0'
version: '0.5.1'

require-dbt-version: [">=1.0.0", "<2.0.0"]

Expand Down
12 changes: 12 additions & 0 deletions integration_tests/ci/sample.profiles.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,16 @@ integration_tests:
dbname: "{{ env_var('CI_POSTGRES_DBT_DBNAME') }}"
port: 5432
schema: fivetran_log_integration_tests
threads: 8
spark:
type: spark
method: http
schema: fivetran_log_integration_tests
host: "{{ env_var('CI_SPARK_DBT_HOST') }}"
organization: "{{ env_var('CI_SPARK_DBT_ORGANIZATION') }}"
token: "{{ env_var('CI_SPARK_DBT_TOKEN') }}"
cluster: "{{ env_var('CI_SPARK_DBT_CLUSTER') }}"
port: 443
connect_timeout: 60
connect_retries: 5
threads: 8
15 changes: 9 additions & 6 deletions integration_tests/dbt_project.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
name: 'fivetran_log_integration_tests'
version: '0.5.0'
version: '0.5.1'
config-version: 2
profile: 'integration_tests'

dispatch:
- macro_namespace: dbt_utils
search_order: ['spark_utils', 'dbt_utils']

vars:
fivetran_log:
Expand Down Expand Up @@ -43,25 +46,25 @@ seeds:
destination:
+column_types:
created_at: timestamp
id: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}"
id: "{{ 'string' if target.type in ('bigquery', 'spark') else 'varchar' }}"
destination_membership:
+column_types:
activated_at: timestamp
joined_at: timestamp
log:
+column_types:
time_stamp: timestamp
transformation_id: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}"
transformation_id: "{{ 'string' if target.type in ('bigquery', 'spark') else 'varchar' }}"
transformation:
+column_types:
created_at: timestamp
destination_id: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}"
id: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}"
destination_id: "{{ 'string' if target.type in ('bigquery', 'spark') else 'varchar' }}"
id: "{{ 'string' if target.type in ('bigquery', 'spark') else 'varchar' }}"
trigger_table:
+quote_columns: "{{ true if target.type in ('redshift', 'postgres') else false }}"
+enabled: "{{ true if target.type != 'snowflake' else false }}"
+column_types:
transformation_id: "{{ 'string' if target.name == 'bigquery' else 'varchar' }}"
transformation_id: "{{ 'string' if target.type in ('bigquery', 'spark') else 'varchar' }}"
trigger_table_snowflake:
+enabled: "{{ true if target.type == 'snowflake' else false }}"
user:
Expand Down
10 changes: 5 additions & 5 deletions models/fivetran_log__audit_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ with sync_log as (

select
*,
{{ fivetran_utils.json_extract(string='message_data', string_path='table') }} as table_name
{{ fivetran_utils.json_parse(string='message_data', string_path=['table']) }} as table_name

from {{ ref('stg_fivetran_log__log') }}

Expand Down Expand Up @@ -101,10 +101,10 @@ records_modified_log as (
select
connector_id,
created_at,
{{ fivetran_utils.json_extract(string='message_data', string_path='table') }} as table_name,
{{ fivetran_utils.json_extract(string='message_data', string_path='schema') }} as schema_name,
{{ fivetran_utils.json_extract(string='message_data', string_path='operationType') }} as operation_type,
cast ({{ fivetran_utils.json_extract(string='message_data', string_path='count') }} as {{ dbt_utils.type_int() }}) as row_count
{{ fivetran_utils.json_parse(string='message_data', string_path=['table']) }} as table_name,
{{ fivetran_utils.json_parse(string='message_data', string_path=['schema']) }} as schema_name,
{{ fivetran_utils.json_parse(string='message_data', string_path=['operationType']) }} as operation_type,
cast ({{ fivetran_utils.json_parse(string='message_data', string_path=['count']) }} as {{ dbt_utils.type_int() }}) as row_count

from sync_log
where event_subtype = 'records_modified'
Expand Down
2 changes: 1 addition & 1 deletion models/fivetran_log__connector_daily_events.sql
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ log_events as (
when event_subtype in ('create_table', 'alter_table', 'create_schema', 'change_schema_config') then 'schema_change'
else event_subtype end as event_subtype,

sum(case when event_subtype = 'records_modified' then cast( {{ fivetran_utils.json_extract(string='message_data', string_path='count') }} as {{ dbt_utils.type_int()}} )
sum(case when event_subtype = 'records_modified' then cast( {{ fivetran_utils.json_parse(string='message_data', string_path=['count']) }} as {{ dbt_utils.type_int()}} )
else 1 end) as count_events

from {{ ref('stg_fivetran_log__log') }}
Expand Down
12 changes: 6 additions & 6 deletions models/fivetran_log__connector_status.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ with connector_log as (
or event_type = 'WARNING'
or event_subtype like 'sync%'
or (event_subtype = 'status'
and {{ fivetran_utils.json_extract(string="message_data", string_path="status") }} ='RESCHEDULED'
and {{ fivetran_utils.json_extract(string="message_data", string_path="reason") }} like '%intended behavior%'
and {{ fivetran_utils.json_parse(string="message_data", string_path=["status"]) }} ='RESCHEDULED'
and {{ fivetran_utils.json_parse(string="message_data", string_path=["reason"]) }} like '%intended behavior%'
) -- for priority-first syncs. these should be captured by event_type = 'WARNING' but let's make sure

-- whole reason is "We have rescheduled the connector to force flush data from the forward sync into your destination. This is intended behavior and means that the connector is working as expected."
Expand Down Expand Up @@ -59,8 +59,8 @@ connector_metrics as (
then connector_log.created_at else null end) as last_sync_completed_at,

max(case when connector_log.event_subtype = 'status'
and {{ fivetran_utils.json_extract(string="connector_log.message_data", string_path="status") }} ='RESCHEDULED'
and {{ fivetran_utils.json_extract(string="connector_log.message_data", string_path="reason") }} like '%intended behavior%'
and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["status"]) }} ='RESCHEDULED'
and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["reason"]) }} like '%intended behavior%'
then connector_log.created_at else null end) as last_priority_first_sync_completed_at,


Expand Down Expand Up @@ -130,8 +130,8 @@ connector_recent_logs as (
and connector_log.event_type != 'INFO'
-- need to explicitly avoid priority first statuses because they are of event_type WARNING
and not (connector_log.event_subtype = 'status'
and {{ fivetran_utils.json_extract(string="connector_log.message_data", string_path="status") }} ='RESCHEDULED'
and {{ fivetran_utils.json_extract(string="connector_log.message_data", string_path="reason") }} like '%intended behavior%')
and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["status"]) }} ='RESCHEDULED'
and {{ fivetran_utils.json_parse(string="connector_log.message_data", string_path=["reason"]) }} like '%intended behavior%')

{{ dbt_utils.group_by(n=11) }} -- de-duping error messages

Expand Down
6 changes: 3 additions & 3 deletions models/fivetran_log__schema_changelog.sql
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ final as (
message_data,

case
when event_subtype = 'alter_table' then {{ fivetran_utils.json_extract(string='message_data', string_path='table') }}
when event_subtype = 'create_table' then {{ fivetran_utils.json_extract(string='message_data', string_path='name') }}
when event_subtype = 'alter_table' then {{ fivetran_utils.json_parse(string='message_data', string_path=['table']) }}
when event_subtype = 'create_table' then {{ fivetran_utils.json_parse(string='message_data', string_path=['name']) }}
else null end as table_name,

case
when event_subtype = 'create_schema' or event_subtype = 'create_table' then {{ fivetran_utils.json_extract(string='message_data', string_path='schema') }}
when event_subtype = 'create_schema' or event_subtype = 'create_table' then {{ fivetran_utils.json_parse(string='message_data', string_path=['schema']) }}
else null end as schema_name

from add_connector_info
Expand Down
2 changes: 1 addition & 1 deletion models/staging/src_fivetran_log.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ version: 2

sources:
- name: fivetran_log
database: "{{ var('fivetran_log_database', target.database)}}" # add var config to dbt_project.yml
database: "{% if target.type != 'spark'%}{{ var('fivetran_log_database', target.database) }}{% endif %}" # add var config to dbt_project.yml
schema: "{{ var('fivetran_log_schema', 'fivetran_log')}}"

loader: fivetran
Expand Down
3 changes: 3 additions & 0 deletions packages.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
packages:
- package: fivetran/fivetran_utils
version: [">=0.3.0", "<0.4.0"]

- package: dbt-labs/spark_utils
version: [">=0.3.0", "<0.4.0"]

0 comments on commit 691b82a

Please sign in to comment.