diff --git a/.changes/unreleased/Features-20230321-181232.yaml b/.changes/unreleased/Features-20230321-181232.yaml
new file mode 100644
index 000000000..de111b198
--- /dev/null
+++ b/.changes/unreleased/Features-20230321-181232.yaml
@@ -0,0 +1,6 @@
+kind: Features
+body: Query tagging enabled via yaml and sql configs
+time: 2023-03-21T18:12:32.4933313-05:00
+custom:
+  Author: trouze
+  Issue: "376"
diff --git a/dbt/adapters/redshift/connections.py b/dbt/adapters/redshift/connections.py
index 0c25ec08d..9a276d928 100644
--- a/dbt/adapters/redshift/connections.py
+++ b/dbt/adapters/redshift/connections.py
@@ -149,6 +149,7 @@ class RedshiftCredentials(Credentials):
     role: Optional[str] = None
     sslmode: Optional[UserSSLMode] = field(default_factory=UserSSLMode.default)
     retries: int = 1
+    query_tag: Optional[str] = None
     region: Optional[str] = None  # if not provided, will be determined from host
     # opt-in by default per team deliberation on https://peps.python.org/pep-0249/#autocommit
     autocommit: Optional[bool] = True
@@ -170,6 +171,7 @@ def _connection_keys(self):
             "cluster_id",
             "iam_profile",
             "sslmode",
+            "query_tag",
             "region",
         )
 
@@ -243,6 +245,10 @@ def connect():
                     c.autocommit = True
                 if self.credentials.role:
                     c.cursor().execute("set role {}".format(self.credentials.role))
+                if self.credentials.query_tag:
+                    c.cursor().execute(
+                        "set query_group to '{}'".format(self.credentials.query_tag)
+                    )
                 return c
 
         elif method == RedshiftConnectionMethod.IAM:
@@ -267,6 +273,10 @@ def connect():
                     c.autocommit = True
                 if self.credentials.role:
                     c.cursor().execute("set role {}".format(self.credentials.role))
+                if self.credentials.query_tag:
+                    c.cursor().execute(
+                        "set query_group to '{}'".format(self.credentials.query_tag)
+                    )
                 return c
 
         else:
diff --git a/dbt/adapters/redshift/impl.py b/dbt/adapters/redshift/impl.py
index 54fdd7dcf..a34bd340a 100644
--- a/dbt/adapters/redshift/impl.py
+++ b/dbt/adapters/redshift/impl.py
@@ -27,6 +27,7 @@ class RedshiftConfig(AdapterConfig):
     sort: Optional[str] = None
     bind: Optional[bool] = None
     backup: Optional[bool] = True
+    query_tag: Optional[str] = None
 
 
 class RedshiftAdapter(SQLAdapter):
diff --git a/dbt/include/redshift/macros/adapters.sql b/dbt/include/redshift/macros/adapters.sql
index aae87489f..0b680c4c8 100644
--- a/dbt/include/redshift/macros/adapters.sql
+++ b/dbt/include/redshift/macros/adapters.sql
@@ -267,6 +267,37 @@
 {% endmacro %}
 
 
+{% macro get_current_query_tag() -%}
+  {{ return(run_query("show query_group").rows[0]['query_group']) }}
+{% endmacro %}
+
+
+{% macro redshift__set_query_tag() -%}
+  {% set new_query_tag = config.get('query_tag') %}
+  {% if new_query_tag %}
+    {% set original_query_tag = get_current_query_tag() %}
+    {{ log("Setting query_tag to '" ~ new_query_tag ~ "'. Will reset to '" ~ original_query_tag ~ "' after materialization.") }}
+    {% do run_query("set query_group to '{}'".format(new_query_tag)) %}
+    {{ return(original_query_tag)}}
+  {% endif %}
+  {{ return(none)}}
+{% endmacro %}
+
+
+{% macro redshift__unset_query_tag(original_query_tag) -%}
+  {% set new_query_tag = config.get('query_tag') %}
+  {% if new_query_tag %}
+    {% if original_query_tag %}
+      {{ log("Resetting query_tag to '" ~ original_query_tag ~ "'.") }}
+      {% do run_query("set query_group to '{}'".format(original_query_tag)) %}
+    {% else %}
+      {{ log("No original query_tag, unsetting parameter.") }}
+      {% do run_query("reset query_group") %}
+    {% endif %}
+  {% endif %}
+{% endmacro %}
+
+
 {% macro redshift__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %}
 
   {% if add_columns %}
diff --git a/tests/functional/adapter/test_query_tags.py b/tests/functional/adapter/test_query_tags.py
new file mode 100644
index 000000000..21ef6c484
--- /dev/null
+++ b/tests/functional/adapter/test_query_tags.py
@@ -0,0 +1,137 @@
+import pytest
+from dbt.tests.util import run_dbt
+
+snapshots__snapshot_query_tag_sql = """
+{% snapshot snapshot_query_tag %}
+    {{
+        config(
+            target_database=database,
+            target_schema=schema,
+            unique_key='id',
+            strategy='check',
+            check_cols=['color'],
+        )
+    }}
+    select 1 as id, 'blue'::varchar(4) as color
+{% endsnapshot %}
+
+"""
+
+models__table_model_query_tag_sql = """
+{{ config(materialized = 'table') }}
+
+select 1 as id
+
+"""
+
+models__models_config_yml = """
+version: 2
+
+models:
+  - name: view_model_query_tag
+    columns:
+      - name: id
+        tests:
+          - unique
+
+"""
+
+models__view_model_query_tag_sql = """
+{{ config(materialized = 'view') }}
+
+select 1 as id
+
+"""
+
+models__incremental_model_query_tag_sql = """
+{{ config(materialized = 'incremental', unique_key = 'id') }}
+
+select 1 as id
+
+"""
+
+macros__check_tag_sql = """
+{% macro check_query_tag() %}
+
+  {% if execute %}
+    {% set query_tag = get_current_query_tag() %}
+    {% if query_tag != var("query_tag") %}
+      {{ exceptions.raise_compiler_error("Query tag not used!") }}
+    {% endif %}
+  {% endif %}
+
+{% endmacro %}
+
+"""
+
+seeds__seed_query_tag_csv = """id
+1
+"""
+
+
+class TestQueryTag:
+    @pytest.fixture(scope="class")
+    def models(self):
+        return {
+            "table_model_query_tag.sql": models__table_model_query_tag_sql,
+            "view_model_query_tag.sql": models__view_model_query_tag_sql,
+            "incremental_model_query_tag.sql": models__incremental_model_query_tag_sql,
+            "models_config.yml": models__models_config_yml,
+        }
+
+    @pytest.fixture(scope="class")
+    def snapshots(self):
+        return {"snapshot_query_tag.sql": snapshots__snapshot_query_tag_sql}
+
+    @pytest.fixture(scope="class")
+    def macros(self):
+        return {"check_tag.sql": macros__check_tag_sql}
+
+    @pytest.fixture(scope="class")
+    def seeds(self):
+        return {"seed_query_tag.csv": seeds__seed_query_tag_csv}
+
+    @pytest.fixture(scope="class")
+    def project_config_update(self, prefix):
+        return {
+            "config-version": 2,
+            "models": {
+                "tests": {"query_tag": prefix, "post-hook": "{{ check_tag() }}"},
+            },
+            "seeds": {
+                "tests": {"query_tag": prefix, "post-hook": "{{ check_tag() }}"},
+            },
+            "snapshots": {
+                "tests": {"query_tag": prefix, "post-hook": "{{ check_tag() }}"},
+            },
+            "tests": {"test": {"query_tag": prefix, "post-hook": "{{ check_query_tag() }}"}},
+        }
+
+    def build_all_with_query_tags(self, project, prefix):
+        run_dbt(["build", "--vars", '{{"check_tag": "{}"}}'.format(prefix)])
+
+    def test_redshift_query_tag(self, project, prefix):
+        self.build_all_with_query_tags(project, prefix)
+        self.build_all_with_query_tags(project, prefix)
+
+
+class TestRedshiftProfileQueryTag:
+    @pytest.fixture(scope="class")
+    def models(self):
+        return {
+            "table_model_query_tag.sql": models__table_model_query_tag_sql,
+            "view_model_query_tag.sql": models__view_model_query_tag_sql,
+            "incremental_model_query_tag.sql": models__incremental_model_query_tag_sql,
+            "models_config.yml": models__models_config_yml,
+        }
+
+    @pytest.fixture(scope="class")
+    def profiles_config_update(self, prefix):
+        return {"query_tag": prefix}
+
+    def build_all_with_query_tags(self, project, prefix):
+        run_dbt(["build", "--vars", '{{"check_tag": "{}"}}'.format(prefix)])
+
+    def test_redshift_query_tag(self, project, prefix):
+        self.build_all_with_query_tags(project, prefix)
+        self.build_all_with_query_tags(project, prefix)
diff --git a/tests/unit/test_redshift_adapter.py b/tests/unit/test_redshift_adapter.py
index 5e4e00b94..f499aa05b 100644
--- a/tests/unit/test_redshift_adapter.py
+++ b/tests/unit/test_redshift_adapter.py
@@ -575,6 +575,14 @@ def test_add_query_success(self):
             "select * from test3", True, bindings=None, abridge_sql_log=False
         )
 
+    def test_query_tagging(self):
+        self.config.credentials = self.config.credentials.replace(query_tag="test_query_tag")
+
+        expected_connection_info = [
+            (k, v) for (k, v) in self.config.credentials.connection_info() if k == "query_tag"
+        ]
+        self.assertEqual([("query_tag", "test_query_tag")], expected_connection_info)
+
 
 class TestRedshiftAdapterConversions(TestAdapterConversions):
     def test_convert_text_type(self):