From fd26935c40f60f5b987aee369aaa1ec3cbbb9da0 Mon Sep 17 00:00:00 2001 From: trouze Date: Tue, 21 Mar 2023 18:31:31 -0500 Subject: [PATCH 1/2] ADAP-381: adds query tagging support and materializations to dbt-redshift --- .../unreleased/Features-20230321-181232.yaml | 6 + dbt/adapters/redshift/connections.py | 10 ++ dbt/adapters/redshift/impl.py | 1 + dbt/include/redshift/macros/adapters.sql | 41 ++++++ .../macros/materializations/incremental.sql | 96 ++++++++++++ .../redshift/macros/materializations/seed.sql | 64 ++++++++ .../macros/materializations/snapshot.sql | 104 +++++++++++++ .../macros/materializations/table.sql | 63 ++++++++ .../redshift/macros/materializations/test.sql | 52 +++++++ .../redshift/macros/materializations/view.sql | 71 +++++++++ tests/functional/adapter/test_query_tags.py | 137 ++++++++++++++++++ tests/unit/test_redshift_adapter.py | 8 + 12 files changed, 653 insertions(+) create mode 100644 .changes/unreleased/Features-20230321-181232.yaml create mode 100644 dbt/include/redshift/macros/materializations/incremental.sql create mode 100644 dbt/include/redshift/macros/materializations/seed.sql create mode 100644 dbt/include/redshift/macros/materializations/snapshot.sql create mode 100644 dbt/include/redshift/macros/materializations/table.sql create mode 100644 dbt/include/redshift/macros/materializations/test.sql create mode 100644 dbt/include/redshift/macros/materializations/view.sql create mode 100644 tests/functional/adapter/test_query_tags.py diff --git a/.changes/unreleased/Features-20230321-181232.yaml b/.changes/unreleased/Features-20230321-181232.yaml new file mode 100644 index 000000000..de111b198 --- /dev/null +++ b/.changes/unreleased/Features-20230321-181232.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Query tagging enabled via yaml and sql configs +time: 2023-03-21T18:12:32.4933313-05:00 +custom: + Author: trouze + Issue: "376" diff --git a/dbt/adapters/redshift/connections.py b/dbt/adapters/redshift/connections.py index f35a429da..4e74eea5a 100644 --- a/dbt/adapters/redshift/connections.py +++ b/dbt/adapters/redshift/connections.py @@ -60,6 +60,7 @@ class RedshiftCredentials(Credentials): role: Optional[str] = None sslmode: Optional[str] = None retries: int = 1 + query_tag: Optional[str] = None _ALIASES = {"dbname": "database", "pass": "password"} @@ -78,6 +79,7 @@ def _connection_keys(self): "cluster_id", "iam_profile", "sslmode", + "query_tag", ) @property @@ -121,6 +123,10 @@ def connect(): ) if self.credentials.role: c.cursor().execute("set role {}".format(self.credentials.role)) + if self.credentials.query_tag: + c.cursor().execute( + "set query_group to '{}'".format(self.credentials.query_tag) + ) return c elif method == RedshiftConnectionMethod.IAM: @@ -143,6 +149,10 @@ def connect(): ) if self.credentials.role: c.cursor().execute("set role {}".format(self.credentials.role)) + if self.credentials.query_tag: + c.cursor().execute( + "set query_group to '{}'".format(self.credentials.query_tag) + ) return c else: diff --git a/dbt/adapters/redshift/impl.py b/dbt/adapters/redshift/impl.py index 718f14b08..9c12a90b1 100644 --- a/dbt/adapters/redshift/impl.py +++ b/dbt/adapters/redshift/impl.py @@ -26,6 +26,7 @@ class RedshiftConfig(AdapterConfig): sort: Optional[str] = None bind: Optional[bool] = None backup: Optional[bool] = True + query_tag: Optional[str] = None class RedshiftAdapter(SQLAdapter): diff --git a/dbt/include/redshift/macros/adapters.sql b/dbt/include/redshift/macros/adapters.sql index ebf2e16a5..c3a347251 100644 --- a/dbt/include/redshift/macros/adapters.sql +++ b/dbt/include/redshift/macros/adapters.sql @@ -265,6 +265,47 @@ {% endmacro %} +{% macro get_current_query_tag() -%} + {{ return(run_query("show query_group").rows[0]['query_group']) }} +{% endmacro %} + + +{% macro set_query_tag() -%} + {{ return(adapter.dispatch('set_query_tag', 'dbt')()) }} +{% endmacro %} + + +{% macro redshift__set_query_tag() -%} + {% set new_query_tag = config.get('query_tag') %} + {% if new_query_tag %} + {% set original_query_tag = get_current_query_tag() %} + {{ log("Setting query_tag to '" ~ new_query_tag ~ "'. Will reset to '" ~ original_query_tag ~ "' after materialization.") }} + {% do run_query("set query_group to '{}'".format(new_query_tag)) %} + {{ return(original_query_tag)}} + {% endif %} + {{ return(none)}} +{% endmacro %} + + +{% macro unset_query_tag(original_query_tag) -%} + {{ return(adapter.dispatch('unset_query_tag', 'dbt')(original_query_tag)) }} +{% endmacro %} + + +{% macro redshift__unset_query_tag(original_query_tag) -%} + {% set new_query_tag = config.get('query_tag') %} + {% if new_query_tag %} + {% if original_query_tag %} + {{ log("Resetting query_tag to '" ~ original_query_tag ~ "'.") }} + {% do run_query("set query_group to '{}'".format(original_query_tag)) %} + {% else %} + {{ log("No original query_tag, unsetting parameter.") }} + {% do run_query("reset query_group") %} + {% endif %} + {% endif %} +{% endmacro %} + + {% macro redshift__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %} {% if add_columns %} diff --git a/dbt/include/redshift/macros/materializations/incremental.sql b/dbt/include/redshift/macros/materializations/incremental.sql new file mode 100644 index 000000000..0b08d6f60 --- /dev/null +++ b/dbt/include/redshift/macros/materializations/incremental.sql @@ -0,0 +1,96 @@ + +{% materialization incremental, adapter='redshift' -%} + + {% set original_query_tag = set_query_tag() %} + + -- relations + {%- set existing_relation = load_cached_relation(this) -%} + {%- set target_relation = this.incorporate(type='table') -%} + {%- set temp_relation = make_temp_relation(target_relation)-%} + {%- set intermediate_relation = make_intermediate_relation(target_relation)-%} + {%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%} + {%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%} + + -- configs + {%- set unique_key = config.get('unique_key') -%} + {%- set full_refresh_mode = (should_full_refresh() or existing_relation.is_view) -%} + {%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%} + + -- the temp_ and backup_ relations should not already exist in the database; get_relation + -- will return None in that case. Otherwise, we get a relation that we can drop + -- later, before we try to use this name for the current operation. This has to happen before + -- BEGIN, in a separate transaction + {%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation)-%} + {%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%} + -- grab current tables grants config for comparision later on + {% set grant_config = config.get('grants') %} + {{ drop_relation_if_exists(preexisting_intermediate_relation) }} + {{ drop_relation_if_exists(preexisting_backup_relation) }} + + {{ run_hooks(pre_hooks, inside_transaction=False) }} + + -- `BEGIN` happens here: + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + {% set to_drop = [] %} + + {% if existing_relation is none %} + {% set build_sql = get_create_table_as_sql(False, target_relation, sql) %} + {% elif full_refresh_mode %} + {% set build_sql = get_create_table_as_sql(False, intermediate_relation, sql) %} + {% set need_swap = true %} + {% else %} + {% do run_query(get_create_table_as_sql(True, temp_relation, sql)) %} + {% do adapter.expand_target_column_types( + from_relation=temp_relation, + to_relation=target_relation) %} + {#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#} + {% set dest_columns = process_schema_changes(on_schema_change, temp_relation, existing_relation) %} + {% if not dest_columns %} + {% set dest_columns = adapter.get_columns_in_relation(existing_relation) %} + {% endif %} + + {#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#} + {% set incremental_strategy = config.get('incremental_strategy') or 'default' %} + {% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %} + {% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %} + {% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %} + {% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %} + + {% endif %} + + {% call statement("main") %} + {{ build_sql }} + {% endcall %} + + {% if need_swap %} + {% do adapter.rename_relation(target_relation, backup_relation) %} + {% do adapter.rename_relation(intermediate_relation, target_relation) %} + {% do to_drop.append(backup_relation) %} + {% endif %} + + {% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %} + {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} + + {% do persist_docs(target_relation, model) %} + + {% if existing_relation is none or existing_relation.is_view or should_full_refresh() %} + {% do create_indexes(target_relation) %} + {% endif %} + + {{ run_hooks(post_hooks, inside_transaction=True) }} + + -- `COMMIT` happens here + {% do adapter.commit() %} + + {% for rel in to_drop %} + {% do adapter.drop_relation(rel) %} + {% endfor %} + + {{ run_hooks(post_hooks, inside_transaction=False) }} + + {% do unset_query_tag(original_query_tag) %} + + {{ return({'relations': [target_relation]}) }} + +{%- endmaterialization %} diff --git a/dbt/include/redshift/macros/materializations/seed.sql b/dbt/include/redshift/macros/materializations/seed.sql new file mode 100644 index 000000000..51d9855e1 --- /dev/null +++ b/dbt/include/redshift/macros/materializations/seed.sql @@ -0,0 +1,64 @@ +{% materialization seed, adapter='redshift' %} + + {% set original_query_tag = set_query_tag() %} + + {%- set identifier = model['alias'] -%} + {%- set full_refresh_mode = (should_full_refresh()) -%} + + {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} + + {%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%} + {%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%} + + {%- set grant_config = config.get('grants') -%} + {%- set agate_table = load_agate_table() -%} + -- grab current tables grants config for comparision later on + + {%- do store_result('agate_table', response='OK', agate_table=agate_table) -%} + + {{ run_hooks(pre_hooks, inside_transaction=False) }} + + -- `BEGIN` happens here: + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + -- build model + {% set create_table_sql = "" %} + {% if exists_as_view %} + {{ exceptions.raise_compiler_error("Cannot seed to '{}', it is a view".format(old_relation)) }} + {% elif exists_as_table %} + {% set create_table_sql = reset_csv_table(model, full_refresh_mode, old_relation, agate_table) %} + {% else %} + {% set create_table_sql = create_csv_table(model, agate_table) %} + {% endif %} + + {% set code = 'CREATE' if full_refresh_mode else 'INSERT' %} + {% set rows_affected = (agate_table.rows | length) %} + {% set sql = load_csv_rows(model, agate_table) %} + + {% call noop_statement('main', code ~ ' ' ~ rows_affected, code, rows_affected) %} + {{ get_csv_sql(create_table_sql, sql) }}; + {% endcall %} + + {% set target_relation = this.incorporate(type='table') %} + + {% set should_revoke = should_revoke(old_relation, full_refresh_mode) %} + {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} + + {% do persist_docs(target_relation, model) %} + + {% if full_refresh_mode or not exists_as_table %} + {% do create_indexes(target_relation) %} + {% endif %} + + {{ run_hooks(post_hooks, inside_transaction=True) }} + + -- `COMMIT` happens here + {{ adapter.commit() }} + + {{ run_hooks(post_hooks, inside_transaction=False) }} + + {% do unset_query_tag(original_query_tag) %} + + {{ return({'relations': [target_relation]}) }} + +{% endmaterialization %} diff --git a/dbt/include/redshift/macros/materializations/snapshot.sql b/dbt/include/redshift/macros/materializations/snapshot.sql new file mode 100644 index 000000000..2b31c520b --- /dev/null +++ b/dbt/include/redshift/macros/materializations/snapshot.sql @@ -0,0 +1,104 @@ +{% materialization snapshot, adapter='redshift' %} + + {% set original_query_tag = set_query_tag() %} + + {%- set config = model['config'] -%} + + {%- set target_table = model.get('alias', model.get('name')) -%} + + {%- set strategy_name = config.get('strategy') -%} + {%- set unique_key = config.get('unique_key') %} + -- grab current tables grants config for comparision later on + {%- set grant_config = config.get('grants') -%} + + {% set target_relation_exists, target_relation = get_or_create_relation( + database=model.database, + schema=model.schema, + identifier=target_table, + type='table') -%} + + {%- if not target_relation.is_table -%} + {% do exceptions.relation_wrong_type(target_relation, 'table') %} + {%- endif -%} + + + {{ run_hooks(pre_hooks, inside_transaction=False) }} + + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + {% set strategy_macro = strategy_dispatch(strategy_name) %} + {% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %} + + {% if not target_relation_exists %} + + {% set build_sql = build_snapshot_table(strategy, model['compiled_code']) %} + {% set final_sql = create_table_as(False, target_relation, build_sql) %} + + {% else %} + + {{ adapter.valid_snapshot_target(target_relation) }} + + {% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %} + + -- this may no-op if the database does not require column expansion + {% do adapter.expand_target_column_types(from_relation=staging_table, + to_relation=target_relation) %} + + {% set missing_columns = adapter.get_missing_columns(staging_table, target_relation) + | rejectattr('name', 'equalto', 'dbt_change_type') + | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') + | rejectattr('name', 'equalto', 'dbt_unique_key') + | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') + | list %} + + {% do create_columns(target_relation, missing_columns) %} + + {% set source_columns = adapter.get_columns_in_relation(staging_table) + | rejectattr('name', 'equalto', 'dbt_change_type') + | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') + | rejectattr('name', 'equalto', 'dbt_unique_key') + | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') + | list %} + + {% set quoted_source_columns = [] %} + {% for column in source_columns %} + {% do quoted_source_columns.append(adapter.quote(column.name)) %} + {% endfor %} + + {% set final_sql = snapshot_merge_sql( + target = target_relation, + source = staging_table, + insert_cols = quoted_source_columns + ) + %} + + {% endif %} + + {% call statement('main') %} + {{ final_sql }} + {% endcall %} + + {% set should_revoke = should_revoke(target_relation_exists, full_refresh_mode=False) %} + {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} + + {% do persist_docs(target_relation, model) %} + + {% if not target_relation_exists %} + {% do create_indexes(target_relation) %} + {% endif %} + + {{ run_hooks(post_hooks, inside_transaction=True) }} + + {{ adapter.commit() }} + + {% if staging_table is defined %} + {% do post_snapshot(staging_table) %} + {% endif %} + + {{ run_hooks(post_hooks, inside_transaction=False) }} + + {% do unset_query_tag(original_query_tag) %} + + {{ return({'relations': [target_relation]}) }} + +{% endmaterialization %} diff --git a/dbt/include/redshift/macros/materializations/table.sql b/dbt/include/redshift/macros/materializations/table.sql new file mode 100644 index 000000000..4c4276b25 --- /dev/null +++ b/dbt/include/redshift/macros/materializations/table.sql @@ -0,0 +1,63 @@ +{% materialization table, adapter='redshift' %} + + {% set original_query_tag = set_query_tag() %} + + {%- set existing_relation = load_cached_relation(this) -%} + {%- set target_relation = this.incorporate(type='table') %} + {%- set intermediate_relation = make_intermediate_relation(target_relation) -%} + -- the intermediate_relation should not already exist in the database; get_relation + -- will return None in that case. Otherwise, we get a relation that we can drop + -- later, before we try to use this name for the current operation + {%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation) -%} + /* + See ../view/view.sql for more information about this relation. + */ + {%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%} + {%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%} + -- as above, the backup_relation should not already exist + {%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%} + -- grab current tables grants config for comparision later on + {% set grant_config = config.get('grants') %} + + -- drop the temp relations if they exist already in the database + {{ drop_relation_if_exists(preexisting_intermediate_relation) }} + {{ drop_relation_if_exists(preexisting_backup_relation) }} + + {{ run_hooks(pre_hooks, inside_transaction=False) }} + + -- `BEGIN` happens here: + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + -- build model + {% call statement('main') -%} + {{ get_create_table_as_sql(False, intermediate_relation, sql) }} + {%- endcall %} + + -- cleanup + {% if existing_relation is not none %} + {{ adapter.rename_relation(existing_relation, backup_relation) }} + {% endif %} + + {{ adapter.rename_relation(intermediate_relation, target_relation) }} + + {% do create_indexes(target_relation) %} + + {{ run_hooks(post_hooks, inside_transaction=True) }} + + {% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %} + {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} + + {% do persist_docs(target_relation, model) %} + + -- `COMMIT` happens here + {{ adapter.commit() }} + + -- finally, drop the existing/backup relation after the commit + {{ drop_relation_if_exists(backup_relation) }} + + {{ run_hooks(post_hooks, inside_transaction=False) }} + + {% do unset_query_tag(original_query_tag) %} + + {{ return({'relations': [target_relation]}) }} +{% endmaterialization %} diff --git a/dbt/include/redshift/macros/materializations/test.sql b/dbt/include/redshift/macros/materializations/test.sql new file mode 100644 index 000000000..8613364c0 --- /dev/null +++ b/dbt/include/redshift/macros/materializations/test.sql @@ -0,0 +1,52 @@ +{%- materialization test, adapter='redshift' -%} + + {% set original_query_tag = set_query_tag() %} + + {% set relations = [] %} + + {% if should_store_failures() %} + + {% set identifier = model['alias'] %} + {% set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) %} + {% set target_relation = api.Relation.create( + identifier=identifier, schema=schema, database=database, type='table') -%} %} + + {% if old_relation %} + {% do adapter.drop_relation(old_relation) %} + {% endif %} + + {% call statement(auto_begin=True) %} + {{ create_table_as(False, target_relation, sql) }} + {% endcall %} + + {% do relations.append(target_relation) %} + + {% set main_sql %} + select * + from {{ target_relation }} + {% endset %} + + {{ adapter.commit() }} + + {% else %} + + {% set main_sql = sql %} + + {% endif %} + + {% set limit = config.get('limit') %} + {% set fail_calc = config.get('fail_calc') %} + {% set warn_if = config.get('warn_if') %} + {% set error_if = config.get('error_if') %} + + {% call statement('main', fetch_result=True) -%} + + {{ get_test_sql(main_sql, fail_calc, warn_if, error_if, limit)}} + + {%- endcall %} + + {% do unset_query_tag(original_query_tag) %} + + {{ return({'relations': relations}) }} + +{%- endmaterialization -%} diff --git a/dbt/include/redshift/macros/materializations/view.sql b/dbt/include/redshift/macros/materializations/view.sql new file mode 100644 index 000000000..04cf72bb2 --- /dev/null +++ b/dbt/include/redshift/macros/materializations/view.sql @@ -0,0 +1,71 @@ +{%- materialization view, adapter='redshift' -%} + + {% set original_query_tag = set_query_tag() %} + + {%- set existing_relation = load_cached_relation(this) -%} + {%- set target_relation = this.incorporate(type='view') -%} + {%- set intermediate_relation = make_intermediate_relation(target_relation) -%} + + -- the intermediate_relation should not already exist in the database; get_relation + -- will return None in that case. Otherwise, we get a relation that we can drop + -- later, before we try to use this name for the current operation + {%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation) -%} + /* + This relation (probably) doesn't exist yet. If it does exist, it's a leftover from + a previous run, and we're going to try to drop it immediately. At the end of this + materialization, we're going to rename the "existing_relation" to this identifier, + and then we're going to drop it. In order to make sure we run the correct one of: + - drop view ... + - drop table ... + + We need to set the type of this relation to be the type of the existing_relation, if it exists, + or else "view" as a sane default if it does not. Note that if the existing_relation does not + exist, then there is nothing to move out of the way and subsequentally drop. In that case, + this relation will be effectively unused. + */ + {%- set backup_relation_type = 'view' if existing_relation is none else existing_relation.type -%} + {%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%} + -- as above, the backup_relation should not already exist + {%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%} + -- grab current tables grants config for comparision later on + {% set grant_config = config.get('grants') %} + + {{ run_hooks(pre_hooks, inside_transaction=False) }} + + -- drop the temp relations if they exist already in the database + {{ drop_relation_if_exists(preexisting_intermediate_relation) }} + {{ drop_relation_if_exists(preexisting_backup_relation) }} + + -- `BEGIN` happens here: + {{ run_hooks(pre_hooks, inside_transaction=True) }} + + -- build model + {% call statement('main') -%} + {{ get_create_view_as_sql(intermediate_relation, sql) }} + {%- endcall %} + + -- cleanup + -- move the existing view out of the way + {% if existing_relation is not none %} + {{ adapter.rename_relation(existing_relation, backup_relation) }} + {% endif %} + {{ adapter.rename_relation(intermediate_relation, target_relation) }} + + {% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %} + {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} + + {% do persist_docs(target_relation, model) %} + + {{ run_hooks(post_hooks, inside_transaction=True) }} + + {{ adapter.commit() }} + + {{ drop_relation_if_exists(backup_relation) }} + + {{ run_hooks(post_hooks, inside_transaction=False) }} + + {% do unset_query_tag(original_query_tag) %} + + {{ return({'relations': [target_relation]}) }} + +{%- endmaterialization -%} diff --git a/tests/functional/adapter/test_query_tags.py b/tests/functional/adapter/test_query_tags.py new file mode 100644 index 000000000..21ef6c484 --- /dev/null +++ b/tests/functional/adapter/test_query_tags.py @@ -0,0 +1,137 @@ +import pytest +from dbt.tests.util import run_dbt + +snapshots__snapshot_query_tag_sql = """ +{% snapshot snapshot_query_tag %} + {{ + config( + target_database=database, + target_schema=schema, + unique_key='id', + strategy='check', + check_cols=['color'], + ) + }} + select 1 as id, 'blue'::varchar(4) as color +{% endsnapshot %} + +""" + +models__table_model_query_tag_sql = """ +{{ config(materialized = 'table') }} + +select 1 as id + +""" + +models__models_config_yml = """ +version: 2 + +models: + - name: view_model_query_tag + columns: + - name: id + tests: + - unique + +""" + +models__view_model_query_tag_sql = """ +{{ config(materialized = 'view') }} + +select 1 as id + +""" + +models__incremental_model_query_tag_sql = """ +{{ config(materialized = 'incremental', unique_key = 'id') }} + +select 1 as id + +""" + +macros__check_tag_sql = """ +{% macro check_query_tag() %} + + {% if execute %} + {% set query_tag = get_current_query_tag() %} + {% if query_tag != var("query_tag") %} + {{ exceptions.raise_compiler_error("Query tag not used!") }} + {% endif %} + {% endif %} + +{% endmacro %} + +""" + +seeds__seed_query_tag_csv = """id +1 +""" + + +class TestQueryTag: + @pytest.fixture(scope="class") + def models(self): + return { + "table_model_query_tag.sql": models__table_model_query_tag_sql, + "view_model_query_tag.sql": models__view_model_query_tag_sql, + "incremental_model_query_tag.sql": models__incremental_model_query_tag_sql, + "models_config.yml": models__models_config_yml, + } + + @pytest.fixture(scope="class") + def snapshots(self): + return {"snapshot_query_tag.sql": snapshots__snapshot_query_tag_sql} + + @pytest.fixture(scope="class") + def macros(self): + return {"check_tag.sql": macros__check_tag_sql} + + @pytest.fixture(scope="class") + def seeds(self): + return {"seed_query_tag.csv": seeds__seed_query_tag_csv} + + @pytest.fixture(scope="class") + def project_config_update(self, prefix): + return { + "config-version": 2, + "models": { + "tests": {"query_tag": prefix, "post-hook": "{{ check_tag() }}"}, + }, + "seeds": { + "tests": {"query_tag": prefix, "post-hook": "{{ check_tag() }}"}, + }, + "snapshots": { + "tests": {"query_tag": prefix, "post-hook": "{{ check_tag() }}"}, + }, + "tests": {"test": {"query_tag": prefix, "post-hook": "{{ check_query_tag() }}"}}, + } + + def build_all_with_query_tags(self, project, prefix): + run_dbt(["build", "--vars", '{{"check_tag": "{}"}}'.format(prefix)]) + + def test_redshift_query_tag(self, project, prefix): + self.build_all_with_query_tags(project, prefix) + self.build_all_with_query_tags(project, prefix) + + +class TestRedshiftProfileQueryTag: + @pytest.fixture(scope="class") + def models(self): + return { + "table_model_query_tag.sql": models__table_model_query_tag_sql, + "view_model_query_tag.sql": models__view_model_query_tag_sql, + "incremental_model_query_tag.sql": models__incremental_model_query_tag_sql, + "models_config.yml": models__models_config_yml, + } + + @pytest.fixture(scope="class") + def profiles_config_update(self, prefix): + return {"query_tag": prefix} + + def build_all_with_query_tags(self, project, prefix): + run_dbt(["build", "--vars", '{{"check_tag": "{}"}}'.format(prefix)]) + + def test_redshift_query_tag(self, project, prefix): + self.build_all_with_query_tags(project, prefix) + self.build_all_with_query_tags(project, prefix) diff --git a/tests/unit/test_redshift_adapter.py b/tests/unit/test_redshift_adapter.py index 27bcd98f8..c162791fc 100644 --- a/tests/unit/test_redshift_adapter.py +++ b/tests/unit/test_redshift_adapter.py @@ -359,6 +359,14 @@ def test_add_query_success(self): "select * from test3", True, bindings=None, abridge_sql_log=False ) + def test_query_tagging(self): + self.config.credentials = self.config.credentials.replace(query_tag="test_query_tag") + + expected_connection_info = [ + (k, v) for (k, v) in self.config.credentials.connection_info() if k == "query_tag" + ] + self.assertEqual([("query_tag", "test_query_tag")], expected_connection_info) + class TestRedshiftAdapterConversions(TestAdapterConversions): def test_convert_text_type(self): From 42e8d2d4fd860bc62b5f0cbaedc4c6a2850dfc6f Mon Sep 17 00:00:00 2001 From: Tyler Rouze Date: Fri, 2 Jun 2023 22:40:01 +0000 Subject: [PATCH 2/2] remove custom matls --- dbt/include/redshift/macros/adapters.sql | 10 -- .../macros/materializations/incremental.sql | 96 ---------------- .../redshift/macros/materializations/seed.sql | 64 ----------- .../macros/materializations/snapshot.sql | 104 ------------------ .../macros/materializations/table.sql | 63 ----------- .../redshift/macros/materializations/test.sql | 52 --------- .../redshift/macros/materializations/view.sql | 71 ------------ 7 files changed, 460 deletions(-) delete mode 100644 dbt/include/redshift/macros/materializations/incremental.sql delete mode 100644 dbt/include/redshift/macros/materializations/seed.sql delete mode 100644 dbt/include/redshift/macros/materializations/snapshot.sql delete mode 100644 dbt/include/redshift/macros/materializations/table.sql delete mode 100644 dbt/include/redshift/macros/materializations/test.sql delete mode 100644 dbt/include/redshift/macros/materializations/view.sql diff --git a/dbt/include/redshift/macros/adapters.sql b/dbt/include/redshift/macros/adapters.sql index c3a347251..8a9635445 100644 --- a/dbt/include/redshift/macros/adapters.sql +++ b/dbt/include/redshift/macros/adapters.sql @@ -270,11 +270,6 @@ {% endmacro %} -{% macro set_query_tag() -%} - {{ return(adapter.dispatch('set_query_tag', 'dbt')()) }} -{% endmacro %} - - {% macro redshift__set_query_tag() -%} {% set new_query_tag = config.get('query_tag') %} {% if new_query_tag %} @@ -287,11 +282,6 @@ {% endmacro %} -{% macro unset_query_tag(original_query_tag) -%} - {{ return(adapter.dispatch('unset_query_tag', 'dbt')(original_query_tag)) }} -{% endmacro %} - - {% macro redshift__unset_query_tag(original_query_tag) -%} {% set new_query_tag = config.get('query_tag') %} {% if new_query_tag %} diff --git a/dbt/include/redshift/macros/materializations/incremental.sql b/dbt/include/redshift/macros/materializations/incremental.sql deleted file mode 100644 index 0b08d6f60..000000000 --- a/dbt/include/redshift/macros/materializations/incremental.sql +++ /dev/null @@ -1,96 +0,0 @@ - -{% materialization incremental, adapter='redshift' -%} - - {% set original_query_tag = set_query_tag() %} - - -- relations - {%- set existing_relation = load_cached_relation(this) -%} - {%- set target_relation = this.incorporate(type='table') -%} - {%- set temp_relation = make_temp_relation(target_relation)-%} - {%- set intermediate_relation = make_intermediate_relation(target_relation)-%} - {%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%} - {%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%} - - -- configs - {%- set unique_key = config.get('unique_key') -%} - {%- set full_refresh_mode = (should_full_refresh() or existing_relation.is_view) -%} - {%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%} - - -- the temp_ and backup_ relations should not already exist in the database; get_relation - -- will return None in that case. Otherwise, we get a relation that we can drop - -- later, before we try to use this name for the current operation. This has to happen before - -- BEGIN, in a separate transaction - {%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation)-%} - {%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%} - -- grab current tables grants config for comparision later on - {% set grant_config = config.get('grants') %} - {{ drop_relation_if_exists(preexisting_intermediate_relation) }} - {{ drop_relation_if_exists(preexisting_backup_relation) }} - - {{ run_hooks(pre_hooks, inside_transaction=False) }} - - -- `BEGIN` happens here: - {{ run_hooks(pre_hooks, inside_transaction=True) }} - - {% set to_drop = [] %} - - {% if existing_relation is none %} - {% set build_sql = get_create_table_as_sql(False, target_relation, sql) %} - {% elif full_refresh_mode %} - {% set build_sql = get_create_table_as_sql(False, intermediate_relation, sql) %} - {% set need_swap = true %} - {% else %} - {% do run_query(get_create_table_as_sql(True, temp_relation, sql)) %} - {% do adapter.expand_target_column_types( - from_relation=temp_relation, - to_relation=target_relation) %} - {#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#} - {% set dest_columns = process_schema_changes(on_schema_change, temp_relation, existing_relation) %} - {% if not dest_columns %} - {% set dest_columns = adapter.get_columns_in_relation(existing_relation) %} - {% endif %} - - {#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#} - {% set incremental_strategy = config.get('incremental_strategy') or 'default' %} - {% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %} - {% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %} - {% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %} - {% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %} - - {% endif %} - - {% call statement("main") %} - {{ build_sql }} - {% endcall %} - - {% if need_swap %} - {% do adapter.rename_relation(target_relation, backup_relation) %} - {% do adapter.rename_relation(intermediate_relation, target_relation) %} - {% do to_drop.append(backup_relation) %} - {% endif %} - - {% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %} - {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} - - {% do persist_docs(target_relation, model) %} - - {% if existing_relation is none or existing_relation.is_view or should_full_refresh() %} - {% do create_indexes(target_relation) %} - {% endif %} - - {{ run_hooks(post_hooks, inside_transaction=True) }} - - -- `COMMIT` happens here - {% do adapter.commit() %} - - {% for rel in to_drop %} - {% do adapter.drop_relation(rel) %} - {% endfor %} - - {{ run_hooks(post_hooks, inside_transaction=False) }} - - {% do unset_query_tag(original_query_tag) %} - - {{ return({'relations': [target_relation]}) }} - -{%- endmaterialization %} diff --git a/dbt/include/redshift/macros/materializations/seed.sql b/dbt/include/redshift/macros/materializations/seed.sql deleted file mode 100644 index 51d9855e1..000000000 --- a/dbt/include/redshift/macros/materializations/seed.sql +++ /dev/null @@ -1,64 +0,0 @@ -{% materialization seed, adapter='redshift' %} - - {% set original_query_tag = set_query_tag() %} - - {%- set identifier = model['alias'] -%} - {%- set full_refresh_mode = (should_full_refresh()) -%} - - {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} - - {%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%} - {%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%} - - {%- set grant_config = config.get('grants') -%} - {%- set agate_table = load_agate_table() -%} - -- grab current tables grants config for comparision later on - - {%- do store_result('agate_table', response='OK', agate_table=agate_table) -%} - - {{ run_hooks(pre_hooks, inside_transaction=False) }} - - -- `BEGIN` happens here: - {{ run_hooks(pre_hooks, inside_transaction=True) }} - - -- build model - {% set create_table_sql = "" %} - {% if exists_as_view %} - {{ exceptions.raise_compiler_error("Cannot seed to '{}', it is a view".format(old_relation)) }} - {% elif exists_as_table %} - {% set create_table_sql = reset_csv_table(model, full_refresh_mode, old_relation, agate_table) %} - {% else %} - {% set create_table_sql = create_csv_table(model, agate_table) %} - {% endif %} - - {% set code = 'CREATE' if full_refresh_mode else 'INSERT' %} - {% set rows_affected = (agate_table.rows | length) %} - {% set sql = load_csv_rows(model, agate_table) %} - - {% call noop_statement('main', code ~ ' ' ~ rows_affected, code, rows_affected) %} - {{ get_csv_sql(create_table_sql, sql) }}; - {% endcall %} - - {% set target_relation = this.incorporate(type='table') %} - - {% set should_revoke = should_revoke(old_relation, full_refresh_mode) %} - {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} - - {% do persist_docs(target_relation, model) %} - - {% if full_refresh_mode or not exists_as_table %} - {% do create_indexes(target_relation) %} - {% endif %} - - {{ run_hooks(post_hooks, inside_transaction=True) }} - - -- `COMMIT` happens here - {{ adapter.commit() }} - - {{ run_hooks(post_hooks, inside_transaction=False) }} - - {% do unset_query_tag(original_query_tag) %} - - {{ return({'relations': [target_relation]}) }} - -{% endmaterialization %} diff --git a/dbt/include/redshift/macros/materializations/snapshot.sql b/dbt/include/redshift/macros/materializations/snapshot.sql deleted file mode 100644 index 2b31c520b..000000000 --- a/dbt/include/redshift/macros/materializations/snapshot.sql +++ /dev/null @@ -1,104 +0,0 @@ -{% materialization snapshot, adapter='redshift' %} - - {% set original_query_tag = set_query_tag() %} - - {%- set config = model['config'] -%} - - {%- set target_table = model.get('alias', model.get('name')) -%} - - {%- set strategy_name = config.get('strategy') -%} - {%- set unique_key = config.get('unique_key') %} - -- grab current tables grants config for comparision later on - {%- set grant_config = config.get('grants') -%} - - {% set target_relation_exists, target_relation = get_or_create_relation( - database=model.database, - schema=model.schema, - identifier=target_table, - type='table') -%} - - {%- if not target_relation.is_table -%} - {% do exceptions.relation_wrong_type(target_relation, 'table') %} - {%- endif -%} - - - {{ run_hooks(pre_hooks, inside_transaction=False) }} - - {{ run_hooks(pre_hooks, inside_transaction=True) }} - - {% set strategy_macro = strategy_dispatch(strategy_name) %} - {% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %} - - {% if not target_relation_exists %} - - {% set build_sql = build_snapshot_table(strategy, model['compiled_code']) %} - {% set final_sql = create_table_as(False, target_relation, build_sql) %} - - {% else %} - - {{ adapter.valid_snapshot_target(target_relation) }} - - {% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %} - - -- this may no-op if the database does not require column expansion - {% do adapter.expand_target_column_types(from_relation=staging_table, - to_relation=target_relation) %} - - {% set missing_columns = adapter.get_missing_columns(staging_table, target_relation) - | rejectattr('name', 'equalto', 'dbt_change_type') - | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') - | rejectattr('name', 'equalto', 'dbt_unique_key') - | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') - | list %} - - {% do create_columns(target_relation, missing_columns) %} - - {% set source_columns = adapter.get_columns_in_relation(staging_table) - | rejectattr('name', 'equalto', 'dbt_change_type') - | rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE') - | rejectattr('name', 'equalto', 'dbt_unique_key') - | rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY') - | list %} - - {% set quoted_source_columns = [] %} - {% for column in source_columns %} - {% do quoted_source_columns.append(adapter.quote(column.name)) %} - {% endfor %} - - {% set final_sql = snapshot_merge_sql( - target = target_relation, - source = staging_table, - insert_cols = quoted_source_columns - ) - %} - - {% endif %} - - {% call statement('main') %} - {{ final_sql }} - {% endcall %} - - {% set should_revoke = should_revoke(target_relation_exists, full_refresh_mode=False) %} - {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} - - {% do persist_docs(target_relation, model) %} - - {% if not target_relation_exists %} - {% do create_indexes(target_relation) %} - {% endif %} - - {{ run_hooks(post_hooks, inside_transaction=True) }} - - {{ adapter.commit() }} - - {% if staging_table is defined %} - {% do post_snapshot(staging_table) %} - {% endif %} - - {{ run_hooks(post_hooks, inside_transaction=False) }} - - {% do unset_query_tag(original_query_tag) %} - - {{ return({'relations': [target_relation]}) }} - -{% endmaterialization %} diff --git a/dbt/include/redshift/macros/materializations/table.sql b/dbt/include/redshift/macros/materializations/table.sql deleted file mode 100644 index 4c4276b25..000000000 --- a/dbt/include/redshift/macros/materializations/table.sql +++ /dev/null @@ -1,63 +0,0 @@ -{% materialization table, adapter='redshift' %} - - {% set original_query_tag = set_query_tag() %} - - {%- set existing_relation = load_cached_relation(this) -%} - {%- set target_relation = this.incorporate(type='table') %} - {%- set intermediate_relation = make_intermediate_relation(target_relation) -%} - -- the intermediate_relation should not already exist in the database; get_relation - -- will return None in that case. Otherwise, we get a relation that we can drop - -- later, before we try to use this name for the current operation - {%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation) -%} - /* - See ../view/view.sql for more information about this relation. - */ - {%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%} - {%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%} - -- as above, the backup_relation should not already exist - {%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%} - -- grab current tables grants config for comparision later on - {% set grant_config = config.get('grants') %} - - -- drop the temp relations if they exist already in the database - {{ drop_relation_if_exists(preexisting_intermediate_relation) }} - {{ drop_relation_if_exists(preexisting_backup_relation) }} - - {{ run_hooks(pre_hooks, inside_transaction=False) }} - - -- `BEGIN` happens here: - {{ run_hooks(pre_hooks, inside_transaction=True) }} - - -- build model - {% call statement('main') -%} - {{ get_create_table_as_sql(False, intermediate_relation, sql) }} - {%- endcall %} - - -- cleanup - {% if existing_relation is not none %} - {{ adapter.rename_relation(existing_relation, backup_relation) }} - {% endif %} - - {{ adapter.rename_relation(intermediate_relation, target_relation) }} - - {% do create_indexes(target_relation) %} - - {{ run_hooks(post_hooks, inside_transaction=True) }} - - {% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %} - {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} - - {% do persist_docs(target_relation, model) %} - - -- `COMMIT` happens here - {{ adapter.commit() }} - - -- finally, drop the existing/backup relation after the commit - {{ drop_relation_if_exists(backup_relation) }} - - {{ run_hooks(post_hooks, inside_transaction=False) }} - - {% do unset_query_tag(original_query_tag) %} - - {{ return({'relations': [target_relation]}) }} -{% endmaterialization %} diff --git a/dbt/include/redshift/macros/materializations/test.sql b/dbt/include/redshift/macros/materializations/test.sql deleted file mode 100644 index 8613364c0..000000000 --- a/dbt/include/redshift/macros/materializations/test.sql +++ /dev/null @@ -1,52 +0,0 @@ -{%- materialization test, adapter='redshift' -%} - - {% set original_query_tag = set_query_tag() %} - - {% set relations = [] %} - - {% if should_store_failures() %} - - {% set identifier = model['alias'] %} - {% set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) %} - {% set target_relation = api.Relation.create( - identifier=identifier, schema=schema, database=database, type='table') -%} %} - - {% if old_relation %} - {% do adapter.drop_relation(old_relation) %} - {% endif %} - - {% call statement(auto_begin=True) %} - {{ create_table_as(False, target_relation, sql) }} - {% endcall %} - - {% do relations.append(target_relation) %} - - {% set main_sql %} - select * - from {{ target_relation }} - {% endset %} - - {{ adapter.commit() }} - - {% else %} - - {% set main_sql = sql %} - - {% endif %} - - {% set limit = config.get('limit') %} - {% set fail_calc = config.get('fail_calc') %} - {% set warn_if = config.get('warn_if') %} - {% set error_if = config.get('error_if') %} - - {% call statement('main', fetch_result=True) -%} - - {{ get_test_sql(main_sql, fail_calc, warn_if, error_if, limit)}} - - {%- endcall %} - - {% do unset_query_tag(original_query_tag) %} - - {{ return({'relations': relations}) }} - -{%- endmaterialization -%} diff --git a/dbt/include/redshift/macros/materializations/view.sql b/dbt/include/redshift/macros/materializations/view.sql deleted file mode 100644 index 04cf72bb2..000000000 --- a/dbt/include/redshift/macros/materializations/view.sql +++ /dev/null @@ -1,71 +0,0 @@ -{%- materialization view, adapter='redshift' -%} - - {% set original_query_tag = set_query_tag() %} - - {%- set existing_relation = load_cached_relation(this) -%} - {%- set target_relation = this.incorporate(type='view') -%} - {%- set intermediate_relation = make_intermediate_relation(target_relation) -%} - - -- the intermediate_relation should not already exist in the database; get_relation - -- will return None in that case. Otherwise, we get a relation that we can drop - -- later, before we try to use this name for the current operation - {%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation) -%} - /* - This relation (probably) doesn't exist yet. If it does exist, it's a leftover from - a previous run, and we're going to try to drop it immediately. At the end of this - materialization, we're going to rename the "existing_relation" to this identifier, - and then we're going to drop it. In order to make sure we run the correct one of: - - drop view ... - - drop table ... - - We need to set the type of this relation to be the type of the existing_relation, if it exists, - or else "view" as a sane default if it does not. Note that if the existing_relation does not - exist, then there is nothing to move out of the way and subsequentally drop. In that case, - this relation will be effectively unused. - */ - {%- set backup_relation_type = 'view' if existing_relation is none else existing_relation.type -%} - {%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%} - -- as above, the backup_relation should not already exist - {%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%} - -- grab current tables grants config for comparision later on - {% set grant_config = config.get('grants') %} - - {{ run_hooks(pre_hooks, inside_transaction=False) }} - - -- drop the temp relations if they exist already in the database - {{ drop_relation_if_exists(preexisting_intermediate_relation) }} - {{ drop_relation_if_exists(preexisting_backup_relation) }} - - -- `BEGIN` happens here: - {{ run_hooks(pre_hooks, inside_transaction=True) }} - - -- build model - {% call statement('main') -%} - {{ get_create_view_as_sql(intermediate_relation, sql) }} - {%- endcall %} - - -- cleanup - -- move the existing view out of the way - {% if existing_relation is not none %} - {{ adapter.rename_relation(existing_relation, backup_relation) }} - {% endif %} - {{ adapter.rename_relation(intermediate_relation, target_relation) }} - - {% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %} - {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} - - {% do persist_docs(target_relation, model) %} - - {{ run_hooks(post_hooks, inside_transaction=True) }} - - {{ adapter.commit() }} - - {{ drop_relation_if_exists(backup_relation) }} - - {{ run_hooks(post_hooks, inside_transaction=False) }} - - {% do unset_query_tag(original_query_tag) %} - - {{ return({'relations': [target_relation]}) }} - -{%- endmaterialization -%}