Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADAP-381: adds query tagging support and materializations to dbt-redshift #379

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230321-181232.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Query tagging enabled via yaml and sql configs
time: 2023-03-21T18:12:32.4933313-05:00
custom:
Author: trouze
Issue: "376"
10 changes: 10 additions & 0 deletions dbt/adapters/redshift/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ class RedshiftCredentials(Credentials):
role: Optional[str] = None
sslmode: Optional[UserSSLMode] = field(default_factory=UserSSLMode.default)
retries: int = 1
query_tag: Optional[str] = None
region: Optional[str] = None # if not provided, will be determined from host
# opt-in by default per team deliberation on https://peps.python.org/pep-0249/#autocommit
autocommit: Optional[bool] = True
Expand All @@ -170,6 +171,7 @@ def _connection_keys(self):
"cluster_id",
"iam_profile",
"sslmode",
"query_tag",
"region",
)

Expand Down Expand Up @@ -243,6 +245,10 @@ def connect():
c.autocommit = True
if self.credentials.role:
c.cursor().execute("set role {}".format(self.credentials.role))
if self.credentials.query_tag:
c.cursor().execute(
"set query_group to '{}'".format(self.credentials.query_tag)
)
return c

elif method == RedshiftConnectionMethod.IAM:
Expand All @@ -267,6 +273,10 @@ def connect():
c.autocommit = True
if self.credentials.role:
c.cursor().execute("set role {}".format(self.credentials.role))
if self.credentials.query_tag:
c.cursor().execute(
"set query_group to '{}'".format(self.credentials.query_tag)
)
return c

else:
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/redshift/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class RedshiftConfig(AdapterConfig):
sort: Optional[str] = None
bind: Optional[bool] = None
backup: Optional[bool] = True
query_tag: Optional[str] = None


class RedshiftAdapter(SQLAdapter):
Expand Down
31 changes: 31 additions & 0 deletions dbt/include/redshift/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,37 @@
{% endmacro %}


{% macro get_current_query_tag() -%}
{{ return(run_query("show query_group").rows[0]['query_group']) }}
{% endmacro %}


{% macro redshift__set_query_tag() -%}
{% set new_query_tag = config.get('query_tag') %}
{% if new_query_tag %}
{% set original_query_tag = get_current_query_tag() %}
{{ log("Setting query_tag to '" ~ new_query_tag ~ "'. Will reset to '" ~ original_query_tag ~ "' after materialization.") }}
{% do run_query("set query_group to '{}'".format(new_query_tag)) %}
{{ return(original_query_tag)}}
{% endif %}
{{ return(none)}}
{% endmacro %}


{% macro redshift__unset_query_tag(original_query_tag) -%}
{% set new_query_tag = config.get('query_tag') %}
{% if new_query_tag %}
{% if original_query_tag %}
{{ log("Resetting query_tag to '" ~ original_query_tag ~ "'.") }}
{% do run_query("set query_group to '{}'".format(original_query_tag)) %}
{% else %}
{{ log("No original query_tag, unsetting parameter.") }}
{% do run_query("reset query_group") %}
{% endif %}
{% endif %}
{% endmacro %}


{% macro redshift__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %}

{% if add_columns %}
Expand Down
137 changes: 137 additions & 0 deletions tests/functional/adapter/test_query_tags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import pytest
from dbt.tests.util import run_dbt

snapshots__snapshot_query_tag_sql = """
{% snapshot snapshot_query_tag %}
{{
config(
target_database=database,
target_schema=schema,
unique_key='id',
strategy='check',
check_cols=['color'],
)
}}
select 1 as id, 'blue'::varchar(4) as color
{% endsnapshot %}

"""

models__table_model_query_tag_sql = """
{{ config(materialized = 'table') }}

select 1 as id

"""

models__models_config_yml = """
version: 2

models:
- name: view_model_query_tag
columns:
- name: id
tests:
- unique

"""

models__view_model_query_tag_sql = """
{{ config(materialized = 'view') }}

select 1 as id

"""

models__incremental_model_query_tag_sql = """
{{ config(materialized = 'incremental', unique_key = 'id') }}

select 1 as id

"""

macros__check_tag_sql = """
{% macro check_query_tag() %}

{% if execute %}
{% set query_tag = get_current_query_tag() %}
{% if query_tag != var("query_tag") %}
{{ exceptions.raise_compiler_error("Query tag not used!") }}
{% endif %}
{% endif %}

{% endmacro %}

"""

seeds__seed_query_tag_csv = """id
1
"""


class TestQueryTag:
@pytest.fixture(scope="class")
def models(self):
return {
"table_model_query_tag.sql": models__table_model_query_tag_sql,
"view_model_query_tag.sql": models__view_model_query_tag_sql,
"incremental_model_query_tag.sql": models__incremental_model_query_tag_sql,
"models_config.yml": models__models_config_yml,
}

@pytest.fixture(scope="class")
def snapshots(self):
return {"snapshot_query_tag.sql": snapshots__snapshot_query_tag_sql}

@pytest.fixture(scope="class")
def macros(self):
return {"check_tag.sql": macros__check_tag_sql}

@pytest.fixture(scope="class")
def seeds(self):
return {"seed_query_tag.csv": seeds__seed_query_tag_csv}

@pytest.fixture(scope="class")
def project_config_update(self, prefix):
return {
"config-version": 2,
"models": {
"tests": {"query_tag": prefix, "post-hook": "{{ check_tag() }}"},
},
"seeds": {
"tests": {"query_tag": prefix, "post-hook": "{{ check_tag() }}"},
},
"snapshots": {
"tests": {"query_tag": prefix, "post-hook": "{{ check_tag() }}"},
},
"tests": {"test": {"query_tag": prefix, "post-hook": "{{ check_query_tag() }}"}},
}

def build_all_with_query_tags(self, project, prefix):
run_dbt(["build", "--vars", '{{"check_tag": "{}"}}'.format(prefix)])

def test_redshift_query_tag(self, project, prefix):
self.build_all_with_query_tags(project, prefix)
self.build_all_with_query_tags(project, prefix)


class TestRedshiftProfileQueryTag:
@pytest.fixture(scope="class")
def models(self):
return {
"table_model_query_tag.sql": models__table_model_query_tag_sql,
"view_model_query_tag.sql": models__view_model_query_tag_sql,
"incremental_model_query_tag.sql": models__incremental_model_query_tag_sql,
"models_config.yml": models__models_config_yml,
}

@pytest.fixture(scope="class")
def profiles_config_update(self, prefix):
return {"query_tag": prefix}

def build_all_with_query_tags(self, project, prefix):
run_dbt(["build", "--vars", '{{"check_tag": "{}"}}'.format(prefix)])

def test_redshift_query_tag(self, project, prefix):
self.build_all_with_query_tags(project, prefix)
self.build_all_with_query_tags(project, prefix)
8 changes: 8 additions & 0 deletions tests/unit/test_redshift_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,14 @@ def test_add_query_success(self):
"select * from test3", True, bindings=None, abridge_sql_log=False
)

def test_query_tagging(self):
self.config.credentials = self.config.credentials.replace(query_tag="test_query_tag")

expected_connection_info = [
(k, v) for (k, v) in self.config.credentials.connection_info() if k == "query_tag"
]
self.assertEqual([("query_tag", "test_query_tag")], expected_connection_info)


class TestRedshiftAdapterConversions(TestAdapterConversions):
def test_convert_text_type(self):
Expand Down