From 81ef226a83b22f5988c2eba06543d85e8bc613bd Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Tue, 26 Apr 2022 10:20:02 -0600 Subject: [PATCH 1/3] fix validate connection failing --- dbt/adapters/bigquery/connections.py | 3 +-- tests/functional/adapter/test_basic.py | 4 ++++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index 5a5b83044..d8d759494 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -380,8 +380,7 @@ def raw_execute(self, sql, fetch=False, *, use_legacy_sql=False): client = conn.handle fire_event(SQLQuery(conn_name=conn.name, sql=sql)) - - if self.profile.query_comment and self.profile.query_comment.job_label: + if hasattr(self.profile, 'query_comment') and self.profile.query_comment and self.profile.query_comment.job_label: query_comment = self.query_header.comment.query_comment labels = self._labels_from_query_comment(query_comment) else: diff --git a/tests/functional/adapter/test_basic.py b/tests/functional/adapter/test_basic.py index fceef2dee..2c07a6e98 100644 --- a/tests/functional/adapter/test_basic.py +++ b/tests/functional/adapter/test_basic.py @@ -12,6 +12,7 @@ from dbt.tests.adapter.basic.test_snapshot_check_cols import BaseSnapshotCheckCols from dbt.tests.adapter.basic.test_snapshot_timestamp import BaseSnapshotTimestamp from dbt.tests.adapter.basic.test_adapter_methods import BaseAdapterMethod +from dbt.tests.adapter.basic.test_validate_connection import BaseValidateConnection class TestSimpleMaterializationsBigQuery(BaseSimpleMaterializations): @@ -54,3 +55,6 @@ class TestSnapshotTimestampBigQuery(BaseSnapshotTimestamp): class TestBaseAdapterMethodBigQuery(BaseAdapterMethod): pass + +class TestBigQueryValidateConnection(BaseValidateConnection): + pass \ No newline at end of file From 7eaecc3fecc382a206747c2f05ae27ea1d5a0f76 Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Tue, 26 Apr 2022 16:17:44 -0600 Subject: [PATCH 2/3] fix code check --- dbt/adapters/bigquery/connections.py | 56 +++++++++++++++++++++------- 1 file changed, 43 insertions(+), 13 deletions(-) diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index d8d759494..3f0121e8f 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -275,11 +275,15 @@ def get_bigquery_credentials(cls, profile_credentials): elif method == BigQueryConnectionMethod.SERVICE_ACCOUNT: keyfile = profile_credentials.keyfile - return creds.from_service_account_file(keyfile, scopes=profile_credentials.scopes) + return creds.from_service_account_file( + keyfile, scopes=profile_credentials.scopes + ) elif method == BigQueryConnectionMethod.SERVICE_ACCOUNT_JSON: details = profile_credentials.keyfile_json - return creds.from_service_account_info(details, scopes=profile_credentials.scopes) + return creds.from_service_account_info( + details, scopes=profile_credentials.scopes + ) elif method == BigQueryConnectionMethod.OAUTH_SECRETS: return GoogleCredentials.Credentials( @@ -338,7 +342,8 @@ def open(cls, connection): except Exception as e: logger.debug( - "Got an error when attempting to create a bigquery " "client: '{}'".format(e) + "Got an error when attempting to create a bigquery " + "client: '{}'".format(e) ) connection.handle = None @@ -380,7 +385,11 @@ def raw_execute(self, sql, fetch=False, *, use_legacy_sql=False): client = conn.handle fire_event(SQLQuery(conn_name=conn.name, sql=sql)) - if hasattr(self.profile, 'query_comment') and self.profile.query_comment and self.profile.query_comment.job_label: + if ( + hasattr(self.profile, "query_comment") + and self.profile.query_comment + and self.profile.query_comment.job_label + ): query_comment = self.query_header.comment.query_comment labels = self._labels_from_query_comment(query_comment) else: @@ -474,7 +483,10 @@ def execute( message = f"{code} ({num_rows_formated} rows, {processed_bytes} processed)" response = BigQueryAdapterResponse( # type: ignore[call-arg] - _message=message, rows_affected=num_rows, code=code, bytes_processed=bytes_processed + _message=message, + rows_affected=num_rows, + code=code, + bytes_processed=bytes_processed, ) return response, table @@ -483,7 +495,9 @@ def get_partitions_metadata(self, table): def standard_to_legacy(table): return table.project + ":" + table.dataset + "." + table.identifier - legacy_sql = "SELECT * FROM [" + standard_to_legacy(table) + "$__PARTITIONS_SUMMARY__]" + legacy_sql = ( + "SELECT * FROM [" + standard_to_legacy(table) + "$__PARTITIONS_SUMMARY__]" + ) sql = self._add_query_comment(legacy_sql) # auto_begin is ignored on bigquery, and only included for consistency @@ -521,15 +535,20 @@ def copy_bq_table(self, source, destination, write_disposition): ) def copy_and_results(): - job_config = google.cloud.bigquery.CopyJobConfig(write_disposition=write_disposition) - copy_job = client.copy_table(source_ref_array, destination_ref, job_config=job_config) + job_config = google.cloud.bigquery.CopyJobConfig( + write_disposition=write_disposition + ) + copy_job = client.copy_table( + source_ref_array, destination_ref, job_config=job_config + ) timeout = self.get_job_execution_timeout_seconds(conn) or 300 iterator = copy_job.result(timeout=timeout) return copy_job, iterator self._retry_and_handle( msg='copy table "{}" to "{}"'.format( - ", ".join(source_ref.path for source_ref in source_ref_array), destination_ref.path + ", ".join(source_ref.path for source_ref in source_ref_array), + destination_ref.path, ), conn=conn, fn=copy_and_results, @@ -537,7 +556,9 @@ def copy_and_results(): @staticmethod def dataset_ref(database, schema): - return google.cloud.bigquery.DatasetReference(project=database, dataset_id=schema) + return google.cloud.bigquery.DatasetReference( + project=database, dataset_id=schema + ) @staticmethod def table_ref(database, schema, table_name): @@ -556,7 +577,9 @@ def drop_dataset(self, database, schema): client = conn.handle def fn(): - return client.delete_dataset(dataset_ref, delete_contents=True, not_found_ok=True) + return client.delete_dataset( + dataset_ref, delete_contents=True, not_found_ok=True + ) self._retry_and_handle(msg="drop dataset", conn=conn, fn=fn) @@ -571,12 +594,19 @@ def fn(): self._retry_and_handle(msg="create dataset", conn=conn, fn=fn) def _query_and_results( - self, client, sql, job_params, job_creation_timeout=None, job_execution_timeout=None + self, + client, + sql, + job_params, + job_creation_timeout=None, + job_execution_timeout=None, ): """Query the client and wait for results.""" # Cannot reuse job_config if destination is set and ddl is used job_config = google.cloud.bigquery.QueryJobConfig(**job_params) - query_job = client.query(query=sql, job_config=job_config, timeout=job_creation_timeout) + query_job = client.query( + query=sql, job_config=job_config, timeout=job_creation_timeout + ) iterator = query_job.result(timeout=job_execution_timeout) return query_job, iterator From 73214727b1d7b07d3954e35e9b3b23c01e357739 Mon Sep 17 00:00:00 2001 From: Chenyu Li Date: Wed, 27 Apr 2022 08:35:31 -0600 Subject: [PATCH 3/3] more format --- dbt/adapters/bigquery/connections.py | 35 +++++++--------------------- 1 file changed, 9 insertions(+), 26 deletions(-) diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index 3f0121e8f..a445796b5 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -275,15 +275,11 @@ def get_bigquery_credentials(cls, profile_credentials): elif method == BigQueryConnectionMethod.SERVICE_ACCOUNT: keyfile = profile_credentials.keyfile - return creds.from_service_account_file( - keyfile, scopes=profile_credentials.scopes - ) + return creds.from_service_account_file(keyfile, scopes=profile_credentials.scopes) elif method == BigQueryConnectionMethod.SERVICE_ACCOUNT_JSON: details = profile_credentials.keyfile_json - return creds.from_service_account_info( - details, scopes=profile_credentials.scopes - ) + return creds.from_service_account_info(details, scopes=profile_credentials.scopes) elif method == BigQueryConnectionMethod.OAUTH_SECRETS: return GoogleCredentials.Credentials( @@ -342,8 +338,7 @@ def open(cls, connection): except Exception as e: logger.debug( - "Got an error when attempting to create a bigquery " - "client: '{}'".format(e) + "Got an error when attempting to create a bigquery " "client: '{}'".format(e) ) connection.handle = None @@ -495,9 +490,7 @@ def get_partitions_metadata(self, table): def standard_to_legacy(table): return table.project + ":" + table.dataset + "." + table.identifier - legacy_sql = ( - "SELECT * FROM [" + standard_to_legacy(table) + "$__PARTITIONS_SUMMARY__]" - ) + legacy_sql = "SELECT * FROM [" + standard_to_legacy(table) + "$__PARTITIONS_SUMMARY__]" sql = self._add_query_comment(legacy_sql) # auto_begin is ignored on bigquery, and only included for consistency @@ -535,12 +528,8 @@ def copy_bq_table(self, source, destination, write_disposition): ) def copy_and_results(): - job_config = google.cloud.bigquery.CopyJobConfig( - write_disposition=write_disposition - ) - copy_job = client.copy_table( - source_ref_array, destination_ref, job_config=job_config - ) + job_config = google.cloud.bigquery.CopyJobConfig(write_disposition=write_disposition) + copy_job = client.copy_table(source_ref_array, destination_ref, job_config=job_config) timeout = self.get_job_execution_timeout_seconds(conn) or 300 iterator = copy_job.result(timeout=timeout) return copy_job, iterator @@ -556,9 +545,7 @@ def copy_and_results(): @staticmethod def dataset_ref(database, schema): - return google.cloud.bigquery.DatasetReference( - project=database, dataset_id=schema - ) + return google.cloud.bigquery.DatasetReference(project=database, dataset_id=schema) @staticmethod def table_ref(database, schema, table_name): @@ -577,9 +564,7 @@ def drop_dataset(self, database, schema): client = conn.handle def fn(): - return client.delete_dataset( - dataset_ref, delete_contents=True, not_found_ok=True - ) + return client.delete_dataset(dataset_ref, delete_contents=True, not_found_ok=True) self._retry_and_handle(msg="drop dataset", conn=conn, fn=fn) @@ -604,9 +589,7 @@ def _query_and_results( """Query the client and wait for results.""" # Cannot reuse job_config if destination is set and ddl is used job_config = google.cloud.bigquery.QueryJobConfig(**job_params) - query_job = client.query( - query=sql, job_config=job_config, timeout=job_creation_timeout - ) + query_job = client.query(query=sql, job_config=job_config, timeout=job_creation_timeout) iterator = query_job.result(timeout=job_execution_timeout) return query_job, iterator