-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Parse query comment and use as bigquery job labels. #3145
Changes from 9 commits
2fdc113
62be9f9
7b0c74c
8d73ae2
c71a18c
82cca95
af3c3f4
8566a46
9fe2b65
044a6c6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -73,7 +73,9 @@ def __post_serialize__(self, dct): | |
# performing the conversion to a dict | ||
@classmethod | ||
def __pre_deserialize__(cls, data): | ||
if cls._hyphenated: | ||
# `data` might not be a dict, e.g. for `query_comment`, which accepts | ||
# a dict or a string; only snake-case for dict values. | ||
if cls._hyphenated and isinstance(data, dict): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this make sense @gshank? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking at all the existing calls to this class method, it seems to be true. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Honestly, I'm not sure exactly how this code gets reached, but I dropped into a debugger at this point on the unit test suite and found non-dictionary values--specifically, there's a test that sets There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can also run that test with the following added to pre_deserialize:
Looking at the stack, I'm inside a function called |
||
new_dict = {} | ||
for key in data: | ||
if '-' in key: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,3 +1,5 @@ | ||
import json | ||
import re | ||
from contextlib import contextmanager | ||
from dataclasses import dataclass | ||
from functools import lru_cache | ||
|
@@ -305,12 +307,16 @@ def raw_execute(self, sql, fetch=False, *, use_legacy_sql=False): | |
|
||
logger.debug('On {}: {}', conn.name, sql) | ||
|
||
job_params = {'use_legacy_sql': use_legacy_sql} | ||
if self.profile.query_comment.job_label: | ||
query_comment = self.query_header.comment.query_comment | ||
labels = self._labels_from_query_comment(query_comment) | ||
else: | ||
labels = {} | ||
|
||
if active_user: | ||
job_params['labels'] = { | ||
'dbt_invocation_id': active_user.invocation_id | ||
} | ||
labels['dbt_invocation_id'] = active_user.invocation_id | ||
|
||
job_params = {'use_legacy_sql': use_legacy_sql, 'labels': labels} | ||
|
||
priority = conn.credentials.priority | ||
if priority == Priority.Batch: | ||
|
@@ -544,6 +550,16 @@ def _retry_generator(self): | |
initial=self.DEFAULT_INITIAL_DELAY, | ||
maximum=self.DEFAULT_MAXIMUM_DELAY) | ||
|
||
def _labels_from_query_comment(self, comment: str) -> Dict: | ||
try: | ||
comment_labels = json.loads(comment) | ||
except (TypeError, ValueError): | ||
return {'query_comment': _sanitize_label(comment)} | ||
return { | ||
_sanitize_label(key): _sanitize_label(str(value)) | ||
for key, value in comment_labels.items() | ||
} | ||
|
||
|
||
class _ErrorCounter(object): | ||
"""Counts errors seen up to a threshold then raises the next error.""" | ||
|
@@ -573,3 +589,14 @@ def _is_retryable(error): | |
e['reason'] == 'rateLimitExceeded' for e in error.errors): | ||
return True | ||
return False | ||
|
||
|
||
_SANITIZE_LABEL_PATTERN = re.compile(r"[^a-z0-9_-]") | ||
|
||
|
||
def _sanitize_label(value: str, max_length: int = 63) -> str: | ||
"""Return a legal value for a BigQuery label.""" | ||
value = value.strip().lower() | ||
value = _SANITIZE_LABEL_PATTERN.sub("_", value) | ||
value = value[: max_length] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this the behavior we want for long labels? Alternatives include raising our own error, or sending something we know is too long to big query and raising that db exception. As much as this will not often, if ever, be attempted on purpose, a typo could cause this which might be a slightly annoying user experience. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think passing through the input and letting bigquery crash makes sense. |
||
return value |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,6 @@ | ||
import agate | ||
import decimal | ||
import json | ||
import re | ||
import unittest | ||
from contextlib import contextmanager | ||
|
@@ -15,12 +16,14 @@ | |
from dbt.adapters.bigquery import BigQueryRelation | ||
from dbt.adapters.bigquery import Plugin as BigQueryPlugin | ||
from dbt.adapters.bigquery.connections import BigQueryConnectionManager | ||
from dbt.adapters.bigquery.connections import _sanitize_label | ||
from dbt.adapters.base.query_headers import MacroQueryStringSetter | ||
from dbt.clients import agate_helper | ||
import dbt.exceptions | ||
from dbt.logger import GLOBAL_LOGGER as logger # noqa | ||
from dbt.context.providers import RuntimeConfigObject | ||
|
||
import pytest | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I won't block the merge on this, but I'd rather put this import up just above |
||
import google.cloud.bigquery | ||
|
||
from .utils import config_from_parts_or_dicts, inject_adapter, TestAdapterConversions | ||
|
@@ -588,7 +591,6 @@ def test_query_and_results(self, mock_bq): | |
self.mock_client.query.assert_called_once_with( | ||
'sql', job_config=mock_bq.QueryJobConfig()) | ||
|
||
|
||
def test_copy_bq_table_appends(self): | ||
self._copy_table( | ||
write_disposition=dbt.adapters.bigquery.impl.WRITE_APPEND) | ||
|
@@ -615,12 +617,20 @@ def test_copy_bq_table_truncates(self): | |
kwargs['job_config'].write_disposition, | ||
dbt.adapters.bigquery.impl.WRITE_TRUNCATE) | ||
|
||
def test_job_labels_valid_json(self): | ||
expected = {"key": "value"} | ||
labels = self.connections._labels_from_query_comment(json.dumps(expected)) | ||
self.assertEqual(labels, expected) | ||
|
||
def test_job_labels_invalid_json(self): | ||
labels = self.connections._labels_from_query_comment("not json") | ||
self.assertEqual(labels, {"query_comment": "not_json"}) | ||
|
||
def _table_ref(self, proj, ds, table, conn): | ||
return google.cloud.bigquery.table.TableReference.from_string( | ||
'{}.{}.{}'.format(proj, ds, table)) | ||
|
||
def _copy_table(self, write_disposition): | ||
|
||
self.connections.table_ref = self._table_ref | ||
source = BigQueryRelation.create( | ||
database='project', schema='dataset', identifier='table1') | ||
|
@@ -931,3 +941,16 @@ def test_convert_time_type(self): | |
expected = ['time', 'time', 'time'] | ||
for col_idx, expect in enumerate(expected): | ||
assert BigQueryAdapter.convert_time_type(agate_table, col_idx) == expect | ||
|
||
|
||
@pytest.mark.parametrize( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't seem to use this style of test very often, but I actually really like it so I'm happy to introduce it to this file too. |
||
["input", "output"], | ||
[ | ||
("a" * 64, "a" * 63), | ||
("ABC", "abc"), | ||
("a c", "a_c"), | ||
("a ", "a"), | ||
], | ||
) | ||
def test_sanitize_label(input, output): | ||
assert _sanitize_label(input) == output |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The config is
query-comment
, rather thanquery_comment
(docs). Not exactly sure why we did this, but I guess kebab casing is common indbt_project.yml
/ project-level configs. I think this should bejob-label
(instead ofjob_label
) for consistencyThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I switched the base class to
HyphenatedDbtClassMixin
.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On second thought, I may not understand what this base class does, since that change broke tests that I thought were unrelated. Reverted for now, but let me know if you have thoughts about the right way to do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took another look at this, and I think I found a subtle bug in the casing logic. Should be fixed now.