From bc1012ea9c26ede72db573254dd32a77658173b2 Mon Sep 17 00:00:00 2001 From: Michiel De Smet Date: Wed, 21 Sep 2022 20:32:28 +0200 Subject: [PATCH 1/2] Remove obsolete hive configuration --- README.md | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/README.md b/README.md index 517dcf1c..0040b12f 100644 --- a/README.md +++ b/README.md @@ -122,18 +122,6 @@ on your Trino instance. See also: https://trino.io/docs/current/security/authentication-types.html -#### Required configuration - -dbt fundamentally works by dropping and creating tables and views in databases. -As such, the following Trino configs must be set for dbt to work properly on Trino: - -```properties -hive.metastore-cache-ttl=0s -hive.metastore-refresh-interval = 5s -hive.allow-drop-table=true -hive.allow-rename-table=true -``` - #### Session properties per model In some specific cases, there may be needed tuning through the Trino session properties only From 0e16988e75ba1c1d9887384c9a26d841927ed1db Mon Sep 17 00:00:00 2001 From: Michiel De Smet Date: Wed, 21 Sep 2022 20:38:08 +0200 Subject: [PATCH 2/2] Support `on_schema_change` for incremental models --- .../unreleased/Features-20220922-085214.yaml | 7 + README.md | 2 + dbt/include/trino/macros/adapters.sql | 24 ++ .../macros/materializations/incremental.sql | 7 +- .../adapter/materialization/fixtures.py | 330 ++++++++++++++++++ .../test_incremental_schema.py | 217 ++++++++++++ 6 files changed, 586 insertions(+), 1 deletion(-) create mode 100644 .changes/unreleased/Features-20220922-085214.yaml create mode 100644 tests/functional/adapter/materialization/test_incremental_schema.py diff --git a/.changes/unreleased/Features-20220922-085214.yaml b/.changes/unreleased/Features-20220922-085214.yaml new file mode 100644 index 00000000..ae750dff --- /dev/null +++ b/.changes/unreleased/Features-20220922-085214.yaml @@ -0,0 +1,7 @@ +kind: Features +body: Support `on_schema_change` on incremental models +time: 2022-09-22T08:52:14.6753+02:00 +custom: + Author: mdesmet + Issue: "48" + PR: "134" diff --git a/README.md b/README.md index 0040b12f..9f7a8b93 100644 --- a/README.md +++ b/README.md @@ -219,6 +219,8 @@ select * from {{ ref('events') }} {% endif %} ``` +Use the `+on_schema_change` property to define how dbt-trino should handle column changes. See [dbt docs](https://docs.getdbt.com/docs/building-a-dbt-project/building-models/configuring-incremental-models#what-if-the-columns-of-my-incremental-model-change). + ###### `append` (default) The default incremental strategy is `append`. `append` only adds the new records based on the condition specified in the `is_incremental()` conditional block. diff --git a/dbt/include/trino/macros/adapters.sql b/dbt/include/trino/macros/adapters.sql index 2e1af01c..9e3b2126 100644 --- a/dbt/include/trino/macros/adapters.sql +++ b/dbt/include/trino/macros/adapters.sql @@ -202,3 +202,27 @@ {{ return('%s') }} {%- endif -%} {% endmacro %} + + +{% macro trino__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %} + {% if add_columns is none %} + {% set add_columns = [] %} + {% endif %} + {% if remove_columns is none %} + {% set remove_columns = [] %} + {% endif %} + + {% for column in add_columns %} + {% set sql -%} + alter {{ relation.type }} {{ relation }} add column {{ column.name }} {{ column.data_type }} + {%- endset -%} + {% do run_query(sql) %} + {% endfor %} + + {% for column in remove_columns %} + {% set sql -%} + alter {{ relation.type }} {{ relation }} drop column {{ column.name }} + {%- endset -%} + {% do run_query(sql) %} + {% endfor %} +{% endmacro %} diff --git a/dbt/include/trino/macros/materializations/incremental.sql b/dbt/include/trino/macros/materializations/incremental.sql index 52319831..82c6ef82 100644 --- a/dbt/include/trino/macros/materializations/incremental.sql +++ b/dbt/include/trino/macros/materializations/incremental.sql @@ -126,6 +126,7 @@ {#-- Validate early so we don't run SQL if the strategy is invalid --#} {%- set strategy = validate_get_incremental_strategy(config) -%} + {%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%} {{ run_hooks(pre_hooks) }} @@ -138,7 +139,11 @@ {% set drop_tmp_relation_sql = "drop table if exists " ~ tmp_relation %} {% do run_query(drop_tmp_relation_sql) %} {% do run_query(create_table_as(True, tmp_relation, sql)) %} - {% set dest_columns = adapter.get_columns_in_relation(existing_relation) %} + {#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#} + {% set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) %} + {% if not dest_columns %} + {% set dest_columns = adapter.get_columns_in_relation(existing_relation) %} + {% endif %} {% set build_sql = get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %} {% endif %} diff --git a/tests/functional/adapter/materialization/fixtures.py b/tests/functional/adapter/materialization/fixtures.py index 4644e798..dd2c6372 100644 --- a/tests/functional/adapter/materialization/fixtures.py +++ b/tests/functional/adapter/materialization/fixtures.py @@ -23,3 +23,333 @@ tests: - not_null """ + + +schema_base_yml = """\ +version: 2 + +models: + - name: model_a + columns: + - name: id + tests: + - unique + + - name: incremental_ignore + columns: + - name: id + tests: + - unique + + - name: incremental_ignore_target + columns: + - name: id + tests: + - unique + + - name: incremental_append_new_columns + columns: + - name: id + tests: + - unique + + - name: incremental_append_new_columns_target + columns: + - name: id + tests: + - unique + + - name: incremental_append_new_columns_remove_one + columns: + - name: id + tests: + - unique + + - name: incremental_append_new_columns_remove_one_target + columns: + - name: id + tests: + - unique + + - name: incremental_sync_all_columns + columns: + - name: id + tests: + - unique + + - name: incremental_sync_all_columns_target + columns: + - name: id + tests: + - unique +""" + +model_a_sql = """\ +{{ + config(materialized='table') +}} + +with source_data as ( + + select 1 as id, 'aaa' as field1, 'bbb' as field2, 111 as field3, 'TTT' as field4 + union all select 2 as id, 'ccc' as field1, 'ddd' as field2, 222 as field3, 'UUU' as field4 + union all select 3 as id, 'eee' as field1, 'fff' as field2, 333 as field3, 'VVV' as field4 + union all select 4 as id, 'ggg' as field1, 'hhh' as field2, 444 as field3, 'WWW' as field4 + union all select 5 as id, 'iii' as field1, 'jjj' as field2, 555 as field3, 'XXX' as field4 + union all select 6 as id, 'kkk' as field1, 'lll' as field2, 666 as field3, 'YYY' as field4 + +) + +select id + ,field1 + ,field2 + ,field3 + ,field4 + +from source_data +""" + +incremental_ignore_sql = """\ +{{ + config( + materialized='incremental', + unique_key='id', + on_schema_change='ignore' + ) +}} + +WITH source_data AS (SELECT * FROM {{ ref('model_a') }} ) + +{% if is_incremental() %} + +SELECT id, field1, field2, field3, field4 FROM source_data WHERE id NOT IN (SELECT id from {{ this }} ) + +{% else %} + +SELECT id, field1, field2 FROM source_data LIMIT 3 + +{% endif %} +""" + +incremental_ignore_target_sql = """\ +{{ + config(materialized='table') +}} + +with source_data as ( + + select * from {{ ref('model_a') }} + +) + +select id + ,field1 + ,field2 + +from source_data +""" + +incremental_append_new_columns = """\ +{{ + config( + materialized='incremental', + unique_key='id', + on_schema_change='append_new_columns' + ) +}} + +WITH source_data AS (SELECT * FROM {{ ref('model_a') }} ) + +{% if is_incremental() %} + +SELECT id, + cast(field1 as varchar) as field1, + cast(field2 as varchar) as field2, + cast(field3 as varchar) as field3, + cast(field4 as varchar) as field4 +FROM source_data WHERE id NOT IN (SELECT id from {{ this }} ) + +{% else %} + +SELECT id, + cast(field1 as varchar) as field1, + cast(field2 as varchar) as field2 +FROM source_data where id <= 3 + +{% endif %} +""" + +incremental_append_new_columns_target_sql = """\ +{{ + config(materialized='table') +}} + +with source_data as ( + + select * from {{ ref('model_a') }} + +) + +select id + ,cast(field1 as varchar) as field1 + ,cast(field2 as varchar) as field2 + ,cast(CASE WHEN id <= 3 THEN NULL ELSE field3 END as varchar) AS field3 + ,cast(CASE WHEN id <= 3 THEN NULL ELSE field4 END as varchar) AS field4 + +from source_data +""" + +incremental_append_new_columns_remove_one_sql = """\ +{{ + config( + materialized='incremental', + unique_key='id', + on_schema_change='append_new_columns' + ) +}} + +WITH source_data AS (SELECT * FROM {{ ref('model_a') }} ) + +{% if is_incremental() %} + +SELECT id, + cast(field1 as varchar) as field1, + cast(field3 as varchar) as field3, + cast(field4 as varchar) as field4 +FROM source_data WHERE id NOT IN (SELECT id from {{ this }} ) + +{% else %} + +SELECT id, + cast(field1 as varchar) as field1, + cast(field2 as varchar) as field2 +FROM source_data where id <= 3 + +{% endif %} +""" + +incremental_append_new_columns_remove_one_target_sql = """\ +{{ + config(materialized='table') +}} +with source_data as ( + + select * from {{ ref('model_a') }} + +) + +select id, + cast(field1 as varchar) as field1, + cast(CASE WHEN id > 3 THEN NULL ELSE field2 END as varchar) AS field2, + cast(CASE WHEN id <= 3 THEN NULL ELSE field3 END as varchar) AS field3, + cast(CASE WHEN id <= 3 THEN NULL ELSE field4 END as varchar) AS field4 + +from source_data +""" + + +incremental_fail_sql = """\ +{{ + config( + materialized='incremental', + unique_key='id', + on_schema_change='fail' + ) +}} + +WITH source_data AS (SELECT * FROM {{ ref('model_a') }} ) + +{% if is_incremental() %} + +SELECT id, field1, field2 FROM source_data + +{% else %} + +SELECT id, field1, field3 FROm source_data + +{% endif %} +""" + +incremental_sync_all_columns_sql = """\ +{{ + config( + materialized='incremental', + unique_key='id', + on_schema_change='sync_all_columns' + + ) +}} + +WITH source_data AS (SELECT * FROM {{ ref('model_a') }} ) + +{% if is_incremental() %} + +SELECT id, + cast(field1 as varchar) as field1, + cast(field3 as varchar) as field3, -- to validate new fields + cast(field4 as varchar) AS field4 -- to validate new fields + +FROM source_data WHERE id NOT IN (SELECT id from {{ this }} ) + +{% else %} + +select id, + cast(field1 as varchar) as field1, + cast(field2 as varchar) as field2 + +from source_data where id <= 3 + +{% endif %} +""" + +incremental_sync_all_columns_target_sql = """\ +{{ + config(materialized='table') +}} + +with source_data as ( + + select * from {{ ref('model_a') }} + +) +select id + ,cast(field1 as varchar) as field1 + --,field2 + ,cast(case when id <= 3 then null else field3 end as varchar) as field3 + ,cast(case when id <= 3 then null else field4 end as varchar) as field4 + +from source_data +order by id +""" + +select_from_a_sql = "select * from {{ ref('model_a') }} where false" + +select_from_incremental_append_new_columns_sql = ( + "select * from {{ ref('incremental_append_new_columns') }} where false" +) + +select_from_incremental_append_new_columns_remove_one_sql = ( + "select * from {{ ref('incremental_append_new_columns_remove_one') }} where false" +) + +select_from_incremental_append_new_columns_remove_one_target_sql = ( + "select * from {{ ref('incremental_append_new_columns_remove_one_target') }} where false" +) + +select_from_incremental_append_new_columns_target_sql = ( + "select * from {{ ref('incremental_append_new_columns_target') }} where false" +) + +select_from_incremental_ignore_sql = "select * from {{ ref('incremental_ignore') }} where false" + +select_from_incremental_ignore_target_sql = ( + "select * from {{ ref('incremental_ignore_target') }} where false" +) + +select_from_incremental_sync_all_columns_sql = ( + "select * from {{ ref('incremental_sync_all_columns') }} where false" +) + +select_from_incremental_sync_all_columns_target_sql = ( + "select * from {{ ref('incremental_sync_all_columns_target') }} where false" +) diff --git a/tests/functional/adapter/materialization/test_incremental_schema.py b/tests/functional/adapter/materialization/test_incremental_schema.py new file mode 100644 index 00000000..6fb2e9c2 --- /dev/null +++ b/tests/functional/adapter/materialization/test_incremental_schema.py @@ -0,0 +1,217 @@ +import pytest +from dbt.tests.util import check_relations_equal, run_dbt + +from tests.functional.adapter.materialization.fixtures import ( + incremental_append_new_columns, + incremental_append_new_columns_remove_one_sql, + incremental_append_new_columns_remove_one_target_sql, + incremental_append_new_columns_target_sql, + incremental_fail_sql, + incremental_ignore_sql, + incremental_ignore_target_sql, + incremental_sync_all_columns_sql, + incremental_sync_all_columns_target_sql, + model_a_sql, + schema_base_yml, + select_from_a_sql, + select_from_incremental_append_new_columns_remove_one_sql, + select_from_incremental_append_new_columns_remove_one_target_sql, + select_from_incremental_append_new_columns_sql, + select_from_incremental_append_new_columns_target_sql, + select_from_incremental_ignore_sql, + select_from_incremental_ignore_target_sql, + select_from_incremental_sync_all_columns_sql, + select_from_incremental_sync_all_columns_target_sql, +) + + +# TODO Merge not supported in Galaxy yet +# https://github.com/starburstdata/dbt-trino/issues/133 +@pytest.mark.skip_profile("starburst_galaxy") +class OnSchemaChangeBase: + # configuration in dbt_project.yml + @pytest.fixture(scope="class") + def project_config_update(self): + return {"name": "on_schema_change"} + + # everything that goes in the "models" directory + @pytest.fixture(scope="class") + def models(self): + return { + "model_a.sql": model_a_sql, + "incremental_ignore.sql": incremental_ignore_sql, + "incremental_ignore_target.sql": incremental_ignore_target_sql, + "incremental_append_new_columns.sql": incremental_append_new_columns, + "incremental_append_new_columns_target.sql": incremental_append_new_columns_target_sql, + "incremental_append_new_columns_remove_one.sql": incremental_append_new_columns_remove_one_sql, + "incremental_append_new_columns_remove_one_target.sql": incremental_append_new_columns_remove_one_target_sql, + "incremental_fail.sql": incremental_fail_sql, + "incremental_sync_all_columns.sql": incremental_sync_all_columns_sql, + "incremental_sync_all_columns_target.sql": incremental_sync_all_columns_target_sql, + "schema.yml": schema_base_yml, + } + + @pytest.fixture(scope="class") + def tests(self): + return { + "select_from_a.sql": select_from_a_sql, + "select_from_incremental_append_new_columns.sql": select_from_incremental_append_new_columns_sql, + "select_from_incremental_append_new_columns_remove_one.sql": select_from_incremental_append_new_columns_remove_one_sql, + "select_from_incremental_append_new_columns_remove_one_target.sql": select_from_incremental_append_new_columns_remove_one_target_sql, + "select_from_incremental_append_new_columns_target.sql": select_from_incremental_append_new_columns_target_sql, + "select_from_incremental_ignore.sql": select_from_incremental_ignore_sql, + "select_from_incremental_ignore_target.sql": select_from_incremental_ignore_target_sql, + "select_from_incremental_sync_all_columns.sql": select_from_incremental_sync_all_columns_sql, + "select_from_incremental_sync_all_columns_target.sql": select_from_incremental_sync_all_columns_target_sql, + } + + def list_tests_and_assert(self, include, exclude, expected_tests): + list_args = ["ls", "--resource-type", "test"] + if include: + list_args.extend(("--select", include)) + if exclude: + list_args.extend(("--exclude", exclude)) + listed = run_dbt(list_args) + print(listed) + assert len(listed) == len(expected_tests) + test_names = [name.split(".")[-1] for name in listed] + assert sorted(test_names) == sorted(expected_tests) + + def run_tests_and_assert( + self, project, include, exclude, expected_tests, compare_source, compare_target + ): + run_args = ["run"] + if include: + run_args.extend(("--models", include)) + results_one = run_dbt(run_args) + results_two = run_dbt(run_args) + + assert len(results_one) == 3 + assert len(results_two) == 3 + + test_args = ["test"] + if include: + test_args.extend(("--models", include)) + if exclude: + test_args.extend(("--exclude", exclude)) + + results = run_dbt(test_args) + tests_run = [r.node.name for r in results] + assert len(tests_run) == len(expected_tests) + assert sorted(tests_run) == sorted(expected_tests) + check_relations_equal(project.adapter, [compare_source, compare_target]) + + def run_incremental_ignore(self, project): + select = "model_a incremental_ignore incremental_ignore_target" + compare_source = "incremental_ignore" + compare_target = "incremental_ignore_target" + exclude = None + expected = [ + "select_from_a", + "select_from_incremental_ignore", + "select_from_incremental_ignore_target", + "unique_model_a_id", + "unique_incremental_ignore_id", + "unique_incremental_ignore_target_id", + ] + + self.list_tests_and_assert(select, exclude, expected) + self.run_tests_and_assert( + project, select, exclude, expected, compare_source, compare_target + ) + + def run_incremental_append_new_columns(self, project): + select = "model_a incremental_append_new_columns incremental_append_new_columns_target" + compare_source = "incremental_append_new_columns" + compare_target = "incremental_append_new_columns_target" + exclude = None + expected = [ + "select_from_a", + "select_from_incremental_append_new_columns", + "select_from_incremental_append_new_columns_target", + "unique_model_a_id", + "unique_incremental_append_new_columns_id", + "unique_incremental_append_new_columns_target_id", + ] + self.list_tests_and_assert(select, exclude, expected) + self.run_tests_and_assert( + project, select, exclude, expected, compare_source, compare_target + ) + + def run_incremental_append_new_columns_remove_one(self, project): + select = "model_a incremental_append_new_columns_remove_one incremental_append_new_columns_remove_one_target" + compare_source = "incremental_append_new_columns_remove_one" + compare_target = "incremental_append_new_columns_remove_one_target" + exclude = None + expected = [ + "select_from_a", + "select_from_incremental_append_new_columns_remove_one", + "select_from_incremental_append_new_columns_remove_one_target", + "unique_model_a_id", + "unique_incremental_append_new_columns_remove_one_id", + "unique_incremental_append_new_columns_remove_one_target_id", + ] + self.run_tests_and_assert( + project, select, exclude, expected, compare_source, compare_target + ) + + def run_incremental_sync_all_columns(self, project): + select = "model_a incremental_sync_all_columns incremental_sync_all_columns_target" + compare_source = "incremental_sync_all_columns" + compare_target = "incremental_sync_all_columns_target" + exclude = None + expected = [ + "select_from_a", + "select_from_incremental_sync_all_columns", + "select_from_incremental_sync_all_columns_target", + "unique_model_a_id", + "unique_incremental_sync_all_columns_id", + "unique_incremental_sync_all_columns_target_id", + ] + self.list_tests_and_assert(select, exclude, expected) + self.run_tests_and_assert( + project, select, exclude, expected, compare_source, compare_target + ) + + def run_incremental_fail_on_schema_change(self, _): + select = "model_a incremental_fail" + run_dbt(["run", "--models", select, "--full-refresh"]) + results_two = run_dbt(["run", "--models", select], expect_pass=False) + assert "Compilation Error" in results_two[1].message + + def test_run_incremental_ignore(self, project): + self.run_incremental_ignore(project) + + def test_run_incremental_append_new_columns(self, project): + self.run_incremental_append_new_columns(project) + self.run_incremental_append_new_columns_remove_one(project) + + def test_run_incremental_sync_all_columns(self, project): + self.run_incremental_sync_all_columns(project) + + def test_run_incremental_fail_on_schema_change(self, project): + self.run_incremental_fail_on_schema_change(project) + + +@pytest.mark.iceberg +class TestIcebergOnSchemaChange(OnSchemaChangeBase): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "name": "on_schema_change_iceberg", + "models": {"+incremental_strategy": "merge"}, + } + + +@pytest.mark.delta +class TestDeltaOnSchemaChange(OnSchemaChangeBase): + @pytest.fixture(scope="class") + def project_config_update(self): + return { + "name": "on_schema_change_delta", + "models": {"+on_table_exists": "drop", "+incremental_strategy": "merge"}, + } + + @pytest.mark.xfail(reason="This connector does not support dropping columns") + def test_run_incremental_sync_all_columns(self, project): + super(TestDeltaOnSchemaChange, self).test_run_incremental_sync_all_columns(project)