Skip to content

Commit

Permalink
ADAP-381: adds query tagging support and materializations to dbt-reds…
Browse files Browse the repository at this point in the history
…hift
  • Loading branch information
trouze committed Mar 21, 2023
1 parent b9da951 commit fd26935
Show file tree
Hide file tree
Showing 12 changed files with 653 additions and 0 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230321-181232.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Query tagging enabled via yaml and sql configs
time: 2023-03-21T18:12:32.4933313-05:00
custom:
Author: trouze
Issue: "376"
10 changes: 10 additions & 0 deletions dbt/adapters/redshift/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class RedshiftCredentials(Credentials):
role: Optional[str] = None
sslmode: Optional[str] = None
retries: int = 1
query_tag: Optional[str] = None

_ALIASES = {"dbname": "database", "pass": "password"}

Expand All @@ -78,6 +79,7 @@ def _connection_keys(self):
"cluster_id",
"iam_profile",
"sslmode",
"query_tag",
)

@property
Expand Down Expand Up @@ -121,6 +123,10 @@ def connect():
)
if self.credentials.role:
c.cursor().execute("set role {}".format(self.credentials.role))
if self.credentials.query_tag:
c.cursor().execute(
"set query_group to '{}'".format(self.credentials.query_tag)
)
return c

elif method == RedshiftConnectionMethod.IAM:
Expand All @@ -143,6 +149,10 @@ def connect():
)
if self.credentials.role:
c.cursor().execute("set role {}".format(self.credentials.role))
if self.credentials.query_tag:
c.cursor().execute(
"set query_group to '{}'".format(self.credentials.query_tag)
)
return c

else:
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/redshift/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class RedshiftConfig(AdapterConfig):
sort: Optional[str] = None
bind: Optional[bool] = None
backup: Optional[bool] = True
query_tag: Optional[str] = None


class RedshiftAdapter(SQLAdapter):
Expand Down
41 changes: 41 additions & 0 deletions dbt/include/redshift/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,47 @@
{% endmacro %}


{% macro get_current_query_tag() -%}
{{ return(run_query("show query_group").rows[0]['query_group']) }}
{% endmacro %}


{% macro set_query_tag() -%}
{{ return(adapter.dispatch('set_query_tag', 'dbt')()) }}
{% endmacro %}


{% macro redshift__set_query_tag() -%}
{% set new_query_tag = config.get('query_tag') %}
{% if new_query_tag %}
{% set original_query_tag = get_current_query_tag() %}
{{ log("Setting query_tag to '" ~ new_query_tag ~ "'. Will reset to '" ~ original_query_tag ~ "' after materialization.") }}
{% do run_query("set query_group to '{}'".format(new_query_tag)) %}
{{ return(original_query_tag)}}
{% endif %}
{{ return(none)}}
{% endmacro %}


{% macro unset_query_tag(original_query_tag) -%}
{{ return(adapter.dispatch('unset_query_tag', 'dbt')(original_query_tag)) }}
{% endmacro %}


{% macro redshift__unset_query_tag(original_query_tag) -%}
{% set new_query_tag = config.get('query_tag') %}
{% if new_query_tag %}
{% if original_query_tag %}
{{ log("Resetting query_tag to '" ~ original_query_tag ~ "'.") }}
{% do run_query("set query_group to '{}'".format(original_query_tag)) %}
{% else %}
{{ log("No original query_tag, unsetting parameter.") }}
{% do run_query("reset query_group") %}
{% endif %}
{% endif %}
{% endmacro %}


{% macro redshift__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %}

{% if add_columns %}
Expand Down
96 changes: 96 additions & 0 deletions dbt/include/redshift/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@

{% materialization incremental, adapter='redshift' -%}

{% set original_query_tag = set_query_tag() %}

-- relations
{%- set existing_relation = load_cached_relation(this) -%}
{%- set target_relation = this.incorporate(type='table') -%}
{%- set temp_relation = make_temp_relation(target_relation)-%}
{%- set intermediate_relation = make_intermediate_relation(target_relation)-%}
{%- set backup_relation_type = 'table' if existing_relation is none else existing_relation.type -%}
{%- set backup_relation = make_backup_relation(target_relation, backup_relation_type) -%}

-- configs
{%- set unique_key = config.get('unique_key') -%}
{%- set full_refresh_mode = (should_full_refresh() or existing_relation.is_view) -%}
{%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%}

-- the temp_ and backup_ relations should not already exist in the database; get_relation
-- will return None in that case. Otherwise, we get a relation that we can drop
-- later, before we try to use this name for the current operation. This has to happen before
-- BEGIN, in a separate transaction
{%- set preexisting_intermediate_relation = load_cached_relation(intermediate_relation)-%}
{%- set preexisting_backup_relation = load_cached_relation(backup_relation) -%}
-- grab current tables grants config for comparision later on
{% set grant_config = config.get('grants') %}
{{ drop_relation_if_exists(preexisting_intermediate_relation) }}
{{ drop_relation_if_exists(preexisting_backup_relation) }}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set to_drop = [] %}

{% if existing_relation is none %}
{% set build_sql = get_create_table_as_sql(False, target_relation, sql) %}
{% elif full_refresh_mode %}
{% set build_sql = get_create_table_as_sql(False, intermediate_relation, sql) %}
{% set need_swap = true %}
{% else %}
{% do run_query(get_create_table_as_sql(True, temp_relation, sql)) %}
{% do adapter.expand_target_column_types(
from_relation=temp_relation,
to_relation=target_relation) %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
{% set dest_columns = process_schema_changes(on_schema_change, temp_relation, existing_relation) %}
{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}

{#-- Get the incremental_strategy, the macro to use for the strategy, and build the sql --#}
{% set incremental_strategy = config.get('incremental_strategy') or 'default' %}
{% set incremental_predicates = config.get('predicates', none) or config.get('incremental_predicates', none) %}
{% set strategy_sql_macro_func = adapter.get_incremental_strategy_macro(context, incremental_strategy) %}
{% set strategy_arg_dict = ({'target_relation': target_relation, 'temp_relation': temp_relation, 'unique_key': unique_key, 'dest_columns': dest_columns, 'incremental_predicates': incremental_predicates }) %}
{% set build_sql = strategy_sql_macro_func(strategy_arg_dict) %}

{% endif %}

{% call statement("main") %}
{{ build_sql }}
{% endcall %}

{% if need_swap %}
{% do adapter.rename_relation(target_relation, backup_relation) %}
{% do adapter.rename_relation(intermediate_relation, target_relation) %}
{% do to_drop.append(backup_relation) %}
{% endif %}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}

{% if existing_relation is none or existing_relation.is_view or should_full_refresh() %}
{% do create_indexes(target_relation) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{% do adapter.commit() %}

{% for rel in to_drop %}
{% do adapter.drop_relation(rel) %}
{% endfor %}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{% do unset_query_tag(original_query_tag) %}

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}
64 changes: 64 additions & 0 deletions dbt/include/redshift/macros/materializations/seed.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
{% materialization seed, adapter='redshift' %}

{% set original_query_tag = set_query_tag() %}

{%- set identifier = model['alias'] -%}
{%- set full_refresh_mode = (should_full_refresh()) -%}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}

{%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%}
{%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%}

{%- set grant_config = config.get('grants') -%}
{%- set agate_table = load_agate_table() -%}
-- grab current tables grants config for comparision later on

{%- do store_result('agate_table', response='OK', agate_table=agate_table) -%}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}

-- build model
{% set create_table_sql = "" %}
{% if exists_as_view %}
{{ exceptions.raise_compiler_error("Cannot seed to '{}', it is a view".format(old_relation)) }}
{% elif exists_as_table %}
{% set create_table_sql = reset_csv_table(model, full_refresh_mode, old_relation, agate_table) %}
{% else %}
{% set create_table_sql = create_csv_table(model, agate_table) %}
{% endif %}

{% set code = 'CREATE' if full_refresh_mode else 'INSERT' %}
{% set rows_affected = (agate_table.rows | length) %}
{% set sql = load_csv_rows(model, agate_table) %}

{% call noop_statement('main', code ~ ' ' ~ rows_affected, code, rows_affected) %}
{{ get_csv_sql(create_table_sql, sql) }};
{% endcall %}

{% set target_relation = this.incorporate(type='table') %}

{% set should_revoke = should_revoke(old_relation, full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}

{% if full_refresh_mode or not exists_as_table %}
{% do create_indexes(target_relation) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

-- `COMMIT` happens here
{{ adapter.commit() }}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{% do unset_query_tag(original_query_tag) %}

{{ return({'relations': [target_relation]}) }}

{% endmaterialization %}
104 changes: 104 additions & 0 deletions dbt/include/redshift/macros/materializations/snapshot.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
{% materialization snapshot, adapter='redshift' %}

{% set original_query_tag = set_query_tag() %}

{%- set config = model['config'] -%}

{%- set target_table = model.get('alias', model.get('name')) -%}

{%- set strategy_name = config.get('strategy') -%}
{%- set unique_key = config.get('unique_key') %}
-- grab current tables grants config for comparision later on
{%- set grant_config = config.get('grants') -%}

{% set target_relation_exists, target_relation = get_or_create_relation(
database=model.database,
schema=model.schema,
identifier=target_table,
type='table') -%}

{%- if not target_relation.is_table -%}
{% do exceptions.relation_wrong_type(target_relation, 'table') %}
{%- endif -%}


{{ run_hooks(pre_hooks, inside_transaction=False) }}

{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set strategy_macro = strategy_dispatch(strategy_name) %}
{% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %}

{% if not target_relation_exists %}

{% set build_sql = build_snapshot_table(strategy, model['compiled_code']) %}
{% set final_sql = create_table_as(False, target_relation, build_sql) %}

{% else %}

{{ adapter.valid_snapshot_target(target_relation) }}

{% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %}

-- this may no-op if the database does not require column expansion
{% do adapter.expand_target_column_types(from_relation=staging_table,
to_relation=target_relation) %}

{% set missing_columns = adapter.get_missing_columns(staging_table, target_relation)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| list %}

{% do create_columns(target_relation, missing_columns) %}

{% set source_columns = adapter.get_columns_in_relation(staging_table)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| list %}

{% set quoted_source_columns = [] %}
{% for column in source_columns %}
{% do quoted_source_columns.append(adapter.quote(column.name)) %}
{% endfor %}

{% set final_sql = snapshot_merge_sql(
target = target_relation,
source = staging_table,
insert_cols = quoted_source_columns
)
%}

{% endif %}

{% call statement('main') %}
{{ final_sql }}
{% endcall %}

{% set should_revoke = should_revoke(target_relation_exists, full_refresh_mode=False) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}

{% if not target_relation_exists %}
{% do create_indexes(target_relation) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{{ adapter.commit() }}

{% if staging_table is defined %}
{% do post_snapshot(staging_table) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{% do unset_query_tag(original_query_tag) %}

{{ return({'relations': [target_relation]}) }}

{% endmaterialization %}
Loading

0 comments on commit fd26935

Please sign in to comment.