From 2fdc113d9378cea89971e090240965c4fda29fc9 Mon Sep 17 00:00:00 2001 From: Joshua Carp Date: Thu, 4 Mar 2021 00:06:59 -0500 Subject: [PATCH 1/8] Parse query comment and use as bigquery job labels. --- core/dbt/contracts/connection.py | 1 + .../bigquery/dbt/adapters/bigquery/connections.py | 14 ++++++++++---- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/core/dbt/contracts/connection.py b/core/dbt/contracts/connection.py index 86c6978c7a1..22765b583d8 100644 --- a/core/dbt/contracts/connection.py +++ b/core/dbt/contracts/connection.py @@ -215,6 +215,7 @@ def to_target_dict(self): class QueryComment(dbtClassMixin): comment: str = DEFAULT_QUERY_COMMENT append: bool = False + job_label: bool = False class AdapterRequiredConfig(HasCredentials, Protocol): diff --git a/plugins/bigquery/dbt/adapters/bigquery/connections.py b/plugins/bigquery/dbt/adapters/bigquery/connections.py index d660e75dd1d..3e52abc67da 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/connections.py +++ b/plugins/bigquery/dbt/adapters/bigquery/connections.py @@ -1,3 +1,4 @@ +import json from contextlib import contextmanager from dataclasses import dataclass from functools import lru_cache @@ -305,12 +306,17 @@ def raw_execute(self, sql, fetch=False, *, use_legacy_sql=False): logger.debug('On {}: {}', conn.name, sql) - job_params = {'use_legacy_sql': use_legacy_sql} + job_params = {'use_legacy_sql': use_legacy_sql, 'labels': {}} if active_user: - job_params['labels'] = { - 'dbt_invocation_id': active_user.invocation_id - } + job_params['labels']['dbt_invocation_id'] = active_user.invocation_id + + if self.profile.query_comment.job_label: + try: + labels = json.loads(self.query_header.comment.query_comment) + job_params['labels'].update(labels) + except (TypeError, ValueError): + pass priority = conn.credentials.priority if priority == Priority.Batch: From 62be9f90643aa862ffa57bccad863665c97787b0 Mon Sep 17 00:00:00 2001 From: Joshua Carp Date: Thu, 4 Mar 2021 00:14:50 -0500 Subject: [PATCH 2/8] Sanitize bigquery labels. --- .../dbt/adapters/bigquery/connections.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/plugins/bigquery/dbt/adapters/bigquery/connections.py b/plugins/bigquery/dbt/adapters/bigquery/connections.py index 3e52abc67da..7c13e5f4b70 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/connections.py +++ b/plugins/bigquery/dbt/adapters/bigquery/connections.py @@ -1,4 +1,5 @@ import json +import re from contextlib import contextmanager from dataclasses import dataclass from functools import lru_cache @@ -314,7 +315,10 @@ def raw_execute(self, sql, fetch=False, *, use_legacy_sql=False): if self.profile.query_comment.job_label: try: labels = json.loads(self.query_header.comment.query_comment) - job_params['labels'].update(labels) + job_params['labels'].update({ + _sanitize_bigquery_label(key): _sanitize_bigquery_label(str(value)) + for key, value in labels.items() + }) except (TypeError, ValueError): pass @@ -579,3 +583,14 @@ def _is_retryable(error): e['reason'] == 'rateLimitExceeded' for e in error.errors): return True return False + + +_SANITIZE_BIGQUERY_LABEL_PATTERN = re.compile(r"[^a-z0-9_-]") + + +def _sanitize_bigquery_label(value: str, max_length: int = 63) -> str: + """Return a legal value for a BigQuery label.""" + value = value.lower() + value = _SANITIZE_BIGQUERY_LABEL_PATTERN.sub("_", value) + value = value[: max_length - 1] + return value From 7b0c74ca3e50c21e91c7086cc2a7d782feb03785 Mon Sep 17 00:00:00 2001 From: Joshua Carp Date: Thu, 4 Mar 2021 00:34:46 -0500 Subject: [PATCH 3/8] Fix lint. --- .../dbt/adapters/bigquery/connections.py | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/plugins/bigquery/dbt/adapters/bigquery/connections.py b/plugins/bigquery/dbt/adapters/bigquery/connections.py index 7c13e5f4b70..99e68f7313f 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/connections.py +++ b/plugins/bigquery/dbt/adapters/bigquery/connections.py @@ -307,21 +307,25 @@ def raw_execute(self, sql, fetch=False, *, use_legacy_sql=False): logger.debug('On {}: {}', conn.name, sql) - job_params = {'use_legacy_sql': use_legacy_sql, 'labels': {}} + labels = {} if active_user: - job_params['labels']['dbt_invocation_id'] = active_user.invocation_id + labels['dbt_invocation_id'] = active_user.invocation_id if self.profile.query_comment.job_label: try: - labels = json.loads(self.query_header.comment.query_comment) - job_params['labels'].update({ - _sanitize_bigquery_label(key): _sanitize_bigquery_label(str(value)) - for key, value in labels.items() + comment_labels = json.loads( + self.query_header.comment.query_comment + ) + labels.update({ + _sanitize_label(key): _sanitize_label(str(value)) + for key, value in comment_labels.items() }) except (TypeError, ValueError): pass + job_params = {'use_legacy_sql': use_legacy_sql, 'labels': labels} + priority = conn.credentials.priority if priority == Priority.Batch: job_params['priority'] = google.cloud.bigquery.QueryPriority.BATCH @@ -585,12 +589,12 @@ def _is_retryable(error): return False -_SANITIZE_BIGQUERY_LABEL_PATTERN = re.compile(r"[^a-z0-9_-]") +_SANITIZE_LABEL_PATTERN = re.compile(r"[^a-z0-9_-]") -def _sanitize_bigquery_label(value: str, max_length: int = 63) -> str: +def _sanitize_label(value: str, max_length: int = 63) -> str: """Return a legal value for a BigQuery label.""" value = value.lower() - value = _SANITIZE_BIGQUERY_LABEL_PATTERN.sub("_", value) + value = _SANITIZE_LABEL_PATTERN.sub("_", value) value = value[: max_length - 1] return value From 8d73ae2cc00c71ad1b0301d7a52706e95d0488f0 Mon Sep 17 00:00:00 2001 From: Joshua Carp Date: Thu, 4 Mar 2021 10:06:52 -0500 Subject: [PATCH 4/8] Address comments from code review. --- .../dbt/adapters/bigquery/connections.py | 28 ++++++++++--------- test/unit/test_bigquery_adapter.py | 12 ++++++-- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/plugins/bigquery/dbt/adapters/bigquery/connections.py b/plugins/bigquery/dbt/adapters/bigquery/connections.py index 99e68f7313f..69ae2178532 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/connections.py +++ b/plugins/bigquery/dbt/adapters/bigquery/connections.py @@ -307,23 +307,15 @@ def raw_execute(self, sql, fetch=False, *, use_legacy_sql=False): logger.debug('On {}: {}', conn.name, sql) - labels = {} + if self.profile.query_comment.job_label: + query_comment = self.query_header.comment.query_comment + labels = self._labels_from_query_comment(query_comment) + else: + labels = {} if active_user: labels['dbt_invocation_id'] = active_user.invocation_id - if self.profile.query_comment.job_label: - try: - comment_labels = json.loads( - self.query_header.comment.query_comment - ) - labels.update({ - _sanitize_label(key): _sanitize_label(str(value)) - for key, value in comment_labels.items() - }) - except (TypeError, ValueError): - pass - job_params = {'use_legacy_sql': use_legacy_sql, 'labels': labels} priority = conn.credentials.priority @@ -558,6 +550,16 @@ def _retry_generator(self): initial=self.DEFAULT_INITIAL_DELAY, maximum=self.DEFAULT_MAXIMUM_DELAY) + def _labels_from_query_comment(self, comment: str) -> Dict: + try: + comment_labels = json.loads(comment) + except (TypeError, ValueError): + return {'query_comment': _sanitize_label(comment)} + return { + _sanitize_label(key): _sanitize_label(str(value)) + for key, value in comment_labels.items() + } + class _ErrorCounter(object): """Counts errors seen up to a threshold then raises the next error.""" diff --git a/test/unit/test_bigquery_adapter.py b/test/unit/test_bigquery_adapter.py index 00a4ef3b885..2df7a56ffba 100644 --- a/test/unit/test_bigquery_adapter.py +++ b/test/unit/test_bigquery_adapter.py @@ -1,5 +1,6 @@ import agate import decimal +import json import re import unittest from contextlib import contextmanager @@ -588,7 +589,6 @@ def test_query_and_results(self, mock_bq): self.mock_client.query.assert_called_once_with( 'sql', job_config=mock_bq.QueryJobConfig()) - def test_copy_bq_table_appends(self): self._copy_table( write_disposition=dbt.adapters.bigquery.impl.WRITE_APPEND) @@ -615,12 +615,20 @@ def test_copy_bq_table_truncates(self): kwargs['job_config'].write_disposition, dbt.adapters.bigquery.impl.WRITE_TRUNCATE) + def test_job_labels_valid_json(self): + expected = {"key": "value"} + labels = self.connections._labels_from_query_comment(json.dumps(expected)) + self.assertEqual(labels, expected) + + def test_job_labels_invalid_json(self): + labels = self.connections._labels_from_query_comment("not json") + self.assertEqual(labels, {"query_comment": "not_json"}) + def _table_ref(self, proj, ds, table, conn): return google.cloud.bigquery.table.TableReference.from_string( '{}.{}.{}'.format(proj, ds, table)) def _copy_table(self, write_disposition): - self.connections.table_ref = self._table_ref source = BigQueryRelation.create( database='project', schema='dataset', identifier='table1') From c71a18ca07308ae25aeb0c00e01abcd9ca2834c3 Mon Sep 17 00:00:00 2001 From: Joshua Carp Date: Fri, 5 Mar 2021 00:09:17 -0500 Subject: [PATCH 5/8] Hyphenate query comment fields and fix deserialization bug. --- core/dbt/contracts/connection.py | 4 ++-- core/dbt/dataclass_schema.py | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/core/dbt/contracts/connection.py b/core/dbt/contracts/connection.py index 22765b583d8..109482e252b 100644 --- a/core/dbt/contracts/connection.py +++ b/core/dbt/contracts/connection.py @@ -9,7 +9,7 @@ from dbt.logger import GLOBAL_LOGGER as logger from typing_extensions import Protocol from dbt.dataclass_schema import ( - dbtClassMixin, StrEnum, ExtensibleDbtClassMixin, + dbtClassMixin, StrEnum, ExtensibleDbtClassMixin, HyphenatedDbtClassMixin, ValidatedStringMixin, register_pattern ) from dbt.contracts.util import Replaceable @@ -212,7 +212,7 @@ def to_target_dict(self): @dataclass -class QueryComment(dbtClassMixin): +class QueryComment(HyphenatedDbtClassMixin): comment: str = DEFAULT_QUERY_COMMENT append: bool = False job_label: bool = False diff --git a/core/dbt/dataclass_schema.py b/core/dbt/dataclass_schema.py index 1382d489c50..13e6e2c7339 100644 --- a/core/dbt/dataclass_schema.py +++ b/core/dbt/dataclass_schema.py @@ -76,7 +76,9 @@ def __post_serialize__(self, dct, options=None): # performing the conversion to a dict @classmethod def __pre_deserialize__(cls, data, options=None): - if cls._hyphenated: + # `data` might not be a dict, e.g. for `query_comment`, which accepts + # a dict or a string; only snake-case for dict values. + if cls._hyphenated and isinstance(data, dict): new_dict = {} for key in data: if '-' in key: From af3c3f4cbe0d306a5e663c86c99bbfc30158a671 Mon Sep 17 00:00:00 2001 From: Joshua Carp Date: Mon, 8 Mar 2021 15:39:02 -0500 Subject: [PATCH 6/8] Add tests for bigquery label sanitize helper. --- .../bigquery/dbt/adapters/bigquery/connections.py | 4 ++-- test/unit/test_bigquery_adapter.py | 15 +++++++++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/plugins/bigquery/dbt/adapters/bigquery/connections.py b/plugins/bigquery/dbt/adapters/bigquery/connections.py index 69ae2178532..acbecfa84c2 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/connections.py +++ b/plugins/bigquery/dbt/adapters/bigquery/connections.py @@ -596,7 +596,7 @@ def _is_retryable(error): def _sanitize_label(value: str, max_length: int = 63) -> str: """Return a legal value for a BigQuery label.""" - value = value.lower() + value = value.strip().lower() value = _SANITIZE_LABEL_PATTERN.sub("_", value) - value = value[: max_length - 1] + value = value[: max_length] return value diff --git a/test/unit/test_bigquery_adapter.py b/test/unit/test_bigquery_adapter.py index 7cc71c6e459..5f317376191 100644 --- a/test/unit/test_bigquery_adapter.py +++ b/test/unit/test_bigquery_adapter.py @@ -16,12 +16,14 @@ from dbt.adapters.bigquery import BigQueryRelation from dbt.adapters.bigquery import Plugin as BigQueryPlugin from dbt.adapters.bigquery.connections import BigQueryConnectionManager +from dbt.adapters.bigquery.connections import _sanitize_label from dbt.adapters.base.query_headers import MacroQueryStringSetter from dbt.clients import agate_helper import dbt.exceptions from dbt.logger import GLOBAL_LOGGER as logger # noqa from dbt.context.providers import RuntimeConfigObject +import pytest import google.cloud.bigquery from .utils import config_from_parts_or_dicts, inject_adapter, TestAdapterConversions @@ -939,3 +941,16 @@ def test_convert_time_type(self): expected = ['time', 'time', 'time'] for col_idx, expect in enumerate(expected): assert BigQueryAdapter.convert_time_type(agate_table, col_idx) == expect + + +@pytest.mark.parametrize( + ["input", "output"], + [ + ("a" * 64, "a" * 63), + ("ABC", "abc"), + ("a c", "a_c"), + ("a ", "a"), + ], +) +def test_sanitize_label(input, output): + assert _sanitize_label(input) == output From 8566a46793d980df0e6af3b1723434e8423827c6 Mon Sep 17 00:00:00 2001 From: Joshua Carp Date: Mon, 8 Mar 2021 15:50:00 -0500 Subject: [PATCH 7/8] Add BigQuery job labels to changelog. --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c25e0af1354..4cb32d68bc8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ ### Features - Add optional configs for `require_partition_filter` and `partition_expiration_days` in BigQuery ([#1843](https://github.com/fishtown-analytics/dbt/issues/1843), [#2928](https://github.com/fishtown-analytics/dbt/pull/2928)) - Fix for EOL SQL comments prevent entire line execution ([#2731](https://github.com/fishtown-analytics/dbt/issues/2731), [#2974](https://github.com/fishtown-analytics/dbt/pull/2974)) +- Use query comment JSON as job labels for BigQuery adapter when `query-comment.job-label` is set to `true` ([#2483](https://github.com/fishtown-analytics/dbt/issues/2483)), ([#3145](https://github.com/fishtown-analytics/dbt/pull/3145)) ### Under the hood - Add dependabot configuration for alerting maintainers about keeping dependencies up to date and secure. ([#3061](https://github.com/fishtown-analytics/dbt/issues/3061), [#3062](https://github.com/fishtown-analytics/dbt/pull/3062)) @@ -23,6 +24,7 @@ Contributors: - [ran-eh](https://github.com/ran-eh) ([#3036](https://github.com/fishtown-analytics/dbt/pull/3036)) - [@pcasteran](https://github.com/pcasteran) ([#2976](https://github.com/fishtown-analytics/dbt/pull/2976)) - [@VasiliiSurov](https://github.com/VasiliiSurov) ([#3104](https://github.com/fishtown-analytics/dbt/pull/3104)) +- [@jmcarp](https://github.com/jmcarp) ([#3145](https://github.com/fishtown-analytics/dbt/pull/3145)) ## dbt 0.19.1 (Release TBD) From 044a6c6ea4966f04a7f27bbd506ad172d01a6149 Mon Sep 17 00:00:00 2001 From: Joshua Carp Date: Fri, 19 Mar 2021 23:50:45 -0400 Subject: [PATCH 8/8] Cleanups from code review. --- plugins/bigquery/dbt/adapters/bigquery/connections.py | 3 +-- test/unit/test_bigquery_adapter.py | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/plugins/bigquery/dbt/adapters/bigquery/connections.py b/plugins/bigquery/dbt/adapters/bigquery/connections.py index acbecfa84c2..a3ff5d0e6f1 100644 --- a/plugins/bigquery/dbt/adapters/bigquery/connections.py +++ b/plugins/bigquery/dbt/adapters/bigquery/connections.py @@ -594,9 +594,8 @@ def _is_retryable(error): _SANITIZE_LABEL_PATTERN = re.compile(r"[^a-z0-9_-]") -def _sanitize_label(value: str, max_length: int = 63) -> str: +def _sanitize_label(value: str) -> str: """Return a legal value for a BigQuery label.""" value = value.strip().lower() value = _SANITIZE_LABEL_PATTERN.sub("_", value) - value = value[: max_length] return value diff --git a/test/unit/test_bigquery_adapter.py b/test/unit/test_bigquery_adapter.py index 5f317376191..7d42b1b6f64 100644 --- a/test/unit/test_bigquery_adapter.py +++ b/test/unit/test_bigquery_adapter.py @@ -2,6 +2,7 @@ import decimal import json import re +import pytest import unittest from contextlib import contextmanager from requests.exceptions import ConnectionError @@ -23,7 +24,6 @@ from dbt.logger import GLOBAL_LOGGER as logger # noqa from dbt.context.providers import RuntimeConfigObject -import pytest import google.cloud.bigquery from .utils import config_from_parts_or_dicts, inject_adapter, TestAdapterConversions @@ -946,7 +946,6 @@ def test_convert_time_type(self): @pytest.mark.parametrize( ["input", "output"], [ - ("a" * 64, "a" * 63), ("ABC", "abc"), ("a c", "a_c"), ("a ", "a"),