Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add AWS Athena support #383

Merged
merged 11 commits into from
Feb 1, 2024
16 changes: 15 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
# Python virtualenv
.venv/
venv/

# Environment variables
.env

# DBT artifacts
target/
logs/
dbt_modules/
dbt_packages/
.vscode
integration_tests/state/
site/
env/
profiles.yml
package-lock.yml

# IDE
.vscode
.idea

# MacOS
.DS_Store
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ Currently, the following adapters are supported:
- Snowflake
- DuckDB
- Trino (tested with Iceberg connector)
- AWS Athena (tested manually)

## Using This Package

Expand Down
6 changes: 3 additions & 3 deletions dbt_project.yml
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ vars:
other_prefixes: ['rpt_']

# -- Performance variables --
chained_views_threshold: "{{ 5 if target.type != 'trino' else 4 }}"
chained_views_threshold: "{{ 5 if target.type not in ['athena', 'trino'] else 4 }}"

# -- Execution variables --
insert_batch_size: "{{ 500 if target.type == 'bigquery' else 10000 }}"
max_depth_dag: "{{ 9 if target.type in ['bigquery', 'spark', 'databricks'] else 4 if target.type == 'trino' else -1 }}"
insert_batch_size: "{{ 500 if target.type in ['athena', 'bigquery'] else 10000 }}"
max_depth_dag: "{{ 9 if target.type in ['bigquery', 'spark', 'databricks'] else 4 if target.type in ['athena', 'trino'] else -1 }}"
36 changes: 35 additions & 1 deletion integration_tests/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,42 @@
## Test dbt Project
# Test dbt Project

The models within this folder (barring those in models/audit_schema_tests) represent a dbt project with poor DAG modeling. Error detection tools within this package are tested on this dbt project.

<img width="1377" alt="DAG of the test dbt project" src="https://user-images.githubusercontent.com/73915542/170353654-58ad303c-adaa-49f6-86b8-723543eb2d3d.png">

## Adding an Integration Test
Create a seed which matches the intended output of your model and add equality tests comparing the output to your seed to the output of your model.

## Local tests

### AWS Athena

To run tests locally, please follow instructions:

* Set up environment variables:

```bash
ATHENA_S3_STAGING_DIR=
ATHENA_S3_DATA_DIR=
ATHENA_REGION=
ATHENA_SCHEMA=
ATHENA_WORKGROUP=
```

* Add `profiles.yml` file based on [sample](ci/sample.profiles.yml):

```yaml
athena: # for local tests only
type: athena
s3_staging_dir: {{ env_var('ATHENA_S3_STAGING_DIR') }}
s3_data_dir: {{ env_var('ATHENA_S3_DATA_DIR') }}
s3_data_naming: schema_table_unique
region_name: {{ env_var('ATHENA_REGION') }}
schema: {{ env_var('ATHENA_SCHEMA') }}
database: awsdatacatalog
work_group: {{ env_var('ATHENA_WORKGROUP') }}
num_retries: 2
threads: 4
```

* Now you can run integration tests, see details [here](../run_test.sh) with `--target athena` flag for dbt commands.
10 changes: 5 additions & 5 deletions integration_tests/seeds/docs/docs_seeds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ seeds:
- name: test_fct_documentation_coverage
config:
column_types:
staging_documentation_coverage_pct: "{{ 'float' if target.type not in ['spark','databricks','duckdb','trino'] else 'decimal(10,2)' }}"
intermediate_documentation_coverage_pct: "{{ 'float' if target.type not in ['spark','databricks','duckdb','trino'] else 'decimal(10,2)' }}"
marts_documentation_coverage_pct: "{{ 'float' if target.type not in ['spark','databricks','duckdb','trino'] else 'decimal(10,2)' }}"
other_documentation_coverage_pct: "{{ 'float' if target.type not in ['spark','databricks','duckdb','trino'] else 'decimal(10,2)' }}"
staging_documentation_coverage_pct: &float "{{ 'float' if target.type not in ['athena', 'databricks', 'duckdb','trino', 'spark'] else 'decimal(10,2)' }}"
intermediate_documentation_coverage_pct: *float
marts_documentation_coverage_pct: *float
other_documentation_coverage_pct: *float
tags:
- docs
tests:
Expand Down Expand Up @@ -48,4 +48,4 @@ seeds:
tests:
- dbt_utils.equality:
name: equality_fct_undocumented_sources
compare_model: ref('fct_undocumented_sources')
compare_model: ref('fct_undocumented_sources')
12 changes: 6 additions & 6 deletions integration_tests/seeds/tests/tests_seeds.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ seeds:
- name: test_fct_test_coverage
config:
column_types:
test_coverage_pct: "{{ 'float' if target.type != 'trino' else 'double' }}"
staging_test_coverage_pct: "{{ 'float' if target.type != 'trino' else 'double' }}"
intermediate_test_coverage_pct: "{{ 'float' if target.type != 'trino' else 'double' }}"
marts_test_coverage_pct: "{{ 'float' if target.type != 'trino' else 'double' }}"
other_test_coverage_pct: "{{ 'float' if target.type != 'trino' else 'double' }}"
test_coverage_pct: &float "{{ 'float' if target.type not in ['athena', 'trino'] else 'double' }}"
staging_test_coverage_pct: *float
intermediate_test_coverage_pct: *float
marts_test_coverage_pct: *float
other_test_coverage_pct: *float
tests:
- dbt_utils.equality:
name: equality_fct_test_coverage
Expand All @@ -28,4 +28,4 @@ seeds:
- staging_test_coverage_pct
- intermediate_test_coverage_pct
- marts_test_coverage_pct
- other_test_coverage_pct
- other_test_coverage_pct
42 changes: 23 additions & 19 deletions macros/recursive_dag.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
{% macro default__recursive_dag() %}

with recursive direct_relationships as (
select
select
*
from {{ ref('int_direct_relationships') }}
where resource_type <> 'test'
Expand Down Expand Up @@ -44,7 +44,7 @@ all_relationships (
path,
is_dependent_on_chain_of_views
) as (
-- anchor
-- anchor
select distinct
resource_id as parent_id,
resource_name as parent,
Expand Down Expand Up @@ -76,11 +76,11 @@ all_relationships (

from direct_relationships
-- where direct_parent_id is null {# optional lever to change filtering of anchor clause to only include root resources #}

union all

-- recursive clause
select
select
all_relationships.parent_id as parent_id,
all_relationships.parent as parent,
all_relationships.parent_resource_type as parent_resource_type,
Expand All @@ -105,12 +105,12 @@ all_relationships (
direct_relationships.directory_path as child_directory_path,
direct_relationships.file_name as child_file_name,
direct_relationships.is_excluded as child_is_excluded,
all_relationships.distance+1 as distance,
all_relationships.distance+1 as distance,
{{ dbt.array_append('all_relationships.path', 'direct_relationships.resource_name') }} as path,
case
when
all_relationships.child_materialized in ('view', 'ephemeral')
and coalesce(all_relationships.is_dependent_on_chain_of_views, true)
case
when
all_relationships.child_materialized in ('view', 'ephemeral')
and coalesce(all_relationships.is_dependent_on_chain_of_views, true)
then true
else false
end as is_dependent_on_chain_of_views
Expand Down Expand Up @@ -145,7 +145,7 @@ all_relationships (
{% endif %}

with direct_relationships as (
select
select
*
from {{ ref('int_direct_relationships') }}
where resource_type <> 'test'
Expand All @@ -161,12 +161,12 @@ with direct_relationships as (
is_public as child_is_public,
access as child_access,
is_excluded as child_is_excluded

from direct_relationships
)

, cte_0 as (
select
select
parent_id,
child_id,
child_materialized,
Expand All @@ -182,19 +182,19 @@ with direct_relationships as (
{% for i in range(1,max_depth) %}
{% set prev_cte_path %}cte_{{ i - 1 }}.path{% endset %}
, cte_{{i}} as (
select
select
cte_{{i - 1}}.parent_id as parent_id,
direct_relationships.resource_id as child_id,
direct_relationships.materialized as child_materialized,
direct_relationships.is_public as child_is_public,
direct_relationships.access as child_access,
direct_relationships.is_excluded as child_is_excluded,
cte_{{i - 1}}.distance+1 as distance,
cte_{{i - 1}}.distance+1 as distance,
{{ dbt.array_append(prev_cte_path, 'direct_relationships.resource_name') }} as path,
case
when
cte_{{i - 1}}.child_materialized in ('view', 'ephemeral')
and coalesce(cte_{{i - 1}}.is_dependent_on_chain_of_views, true)
case
when
cte_{{i - 1}}.child_materialized in ('view', 'ephemeral')
and coalesce(cte_{{i - 1}}.is_dependent_on_chain_of_views, true)
then true
else false
end as is_dependent_on_chain_of_views
Expand Down Expand Up @@ -265,6 +265,10 @@ with direct_relationships as (

{% macro trino__recursive_dag() %}
{#-- Although Trino supports a recursive WITH-queries,
-- it is less performant than creating CTEs with loops and unioning them --#}
-- it is less performant than creating CTEs with loops and union them --#}
{{ return(bigquery__recursive_dag()) }}
{% endmacro %}

{% macro athena__recursive_dag() %}
{{ return(bigquery__recursive_dag()) }}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ conversion as (

final as (
select
{{ 'current_timestamp' if target.type != 'trino' else 'current_timestamp(6)' }} as measured_at,
{{ dbt.current_timestamp() if target.type != 'trino' else 'current_timestamp(6)' }} as measured_at,
count(*) as total_models,
sum(is_described_model) as documented_models,
round(sum(is_described_model) * 100.00 / count(*), 2) as documentation_coverage_pct,
Expand Down
4 changes: 2 additions & 2 deletions models/marts/tests/fct_test_coverage.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ conversion as (

final as (
select
{{ 'current_timestamp' if target.type != 'trino' else 'current_timestamp(6)' }} as measured_at,
{{ dbt.current_timestamp() if target.type != 'trino' else 'current_timestamp(6)' }} as measured_at,
count(*) as total_models,
sum(number_of_tests_on_model) as total_tests,
sum(is_tested_model) as tested_models,
Expand All @@ -39,4 +39,4 @@ final as (
on test_counts.resource_name = conversion.resource_name
)

select * from final
select * from final