From eac72afa120c0cd5624e3b3c072702e0105ec683 Mon Sep 17 00:00:00 2001 From: Christophe Oudar Date: Sat, 6 Aug 2022 14:31:20 +0200 Subject: [PATCH] Add location/job_id/project_id to adapter response to enable easy job linking --- .../Under the Hood-20220806-142912.yaml | 7 +++ dbt/adapters/bigquery/connections.py | 56 +++++++++++++------ .../test_bigquery_adapter_functions.py | 6 ++ 3 files changed, 53 insertions(+), 16 deletions(-) create mode 100644 .changes/unreleased/Under the Hood-20220806-142912.yaml diff --git a/.changes/unreleased/Under the Hood-20220806-142912.yaml b/.changes/unreleased/Under the Hood-20220806-142912.yaml new file mode 100644 index 000000000..6581f8e22 --- /dev/null +++ b/.changes/unreleased/Under the Hood-20220806-142912.yaml @@ -0,0 +1,7 @@ +kind: Under the Hood +body: Add location/job_id/project_id to adapter response to enable easy job linking +time: 2022-08-06T14:29:12.271054+02:00 +custom: + Author: Kayrnt + Issue: "92" + PR: "250" diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index da7b0f687..502a7b9dd 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -86,6 +86,9 @@ class BigQueryConnectionMethod(StrEnum): @dataclass class BigQueryAdapterResponse(AdapterResponse): bytes_processed: Optional[int] = None + location: Optional[str] = None + project_id: Optional[str] = None + job_id: Optional[str] = None @dataclass @@ -188,6 +191,12 @@ class BigQueryConnectionManager(BaseConnectionManager): @classmethod def handle_error(cls, error, message): error_msg = "\n".join([item["message"] for item in error.errors]) + if hasattr(error, "query_job"): + logger.error( + cls._bq_job_link( + error.query_job.location, error.query_job.project, error.query_job.job_id + ) + ) raise DatabaseException(error_msg) def clear_transaction(self): @@ -446,55 +455,70 @@ def execute( code = None num_rows = None bytes_processed = None + location = None + job_id = None + project_id = None + num_rows_formatted = None + processed_bytes = None if query_job.statement_type == "CREATE_VIEW": code = "CREATE VIEW" elif query_job.statement_type == "CREATE_TABLE_AS_SELECT": + code = "CREATE TABLE" conn = self.get_thread_connection() client = conn.handle query_table = client.get_table(query_job.destination) - code = "CREATE TABLE" num_rows = query_table.num_rows - num_rows_formated = self.format_rows_number(num_rows) - bytes_processed = query_job.total_bytes_processed - processed_bytes = self.format_bytes(bytes_processed) - message = f"{code} ({num_rows_formated} rows, {processed_bytes} processed)" elif query_job.statement_type == "SCRIPT": code = "SCRIPT" - bytes_processed = query_job.total_bytes_processed - message = f"{code} ({self.format_bytes(bytes_processed)} processed)" elif query_job.statement_type in ["INSERT", "DELETE", "MERGE", "UPDATE"]: code = query_job.statement_type num_rows = query_job.num_dml_affected_rows - num_rows_formated = self.format_rows_number(num_rows) - bytes_processed = query_job.total_bytes_processed - processed_bytes = self.format_bytes(bytes_processed) - message = f"{code} ({num_rows_formated} rows, {processed_bytes} processed)" elif query_job.statement_type == "SELECT": + code = "SELECT" conn = self.get_thread_connection() client = conn.handle # use anonymous table for num_rows query_table = client.get_table(query_job.destination) - code = "SELECT" num_rows = query_table.num_rows - num_rows_formated = self.format_rows_number(num_rows) - bytes_processed = query_job.total_bytes_processed - processed_bytes = self.format_bytes(bytes_processed) - message = f"{code} ({num_rows_formated} rows, {processed_bytes} processed)" + + # set common attributes + bytes_processed = query_job.total_bytes_processed + processed_bytes = self.format_bytes(bytes_processed) + location = query_job.location + job_id = query_job.job_id + project_id = query_job.project + if num_rows is not None: + num_rows_formatted = self.format_rows_number(num_rows) + message = f"{code} ({num_rows_formatted} rows, {processed_bytes} processed)" + elif bytes_processed is not None: + message = f"{code} ({processed_bytes} processed)" + else: + message = f"{code}" + + if location is not None and job_id is not None and project_id is not None: + logger.debug(self._bq_job_link(job_id, project_id, location)) response = BigQueryAdapterResponse( # type: ignore[call-arg] _message=message, rows_affected=num_rows, code=code, bytes_processed=bytes_processed, + location=location, + project_id=project_id, + job_id=job_id, ) return response, table + @staticmethod + def _bq_job_link(location, project_id, job_id) -> str: + return f"https://console.cloud.google.com/bigquery?project={project_id}&j=bq:{location}:{job_id}&page=queryresults" + def get_partitions_metadata(self, table): def standard_to_legacy(table): return table.project + ":" + table.dataset + "." + table.identifier diff --git a/tests/integration/bigquery_test/test_bigquery_adapter_functions.py b/tests/integration/bigquery_test/test_bigquery_adapter_functions.py index 06e230f30..8ad4a27fc 100644 --- a/tests/integration/bigquery_test/test_bigquery_adapter_functions.py +++ b/tests/integration/bigquery_test/test_bigquery_adapter_functions.py @@ -21,6 +21,12 @@ def test__bigquery_adapter_functions(self): results = self.run_dbt() self.assertEqual(len(results), 3) + for result in results: + # all queries in adapter models are jobs that are expected to have a location/project_id/job_id + assert result.adapter_response["location"] is not None + assert result.adapter_response["project_id"] is not None + assert result.adapter_response["job_id"] is not None + test_results = self.run_dbt(['test']) self.assertTrue(len(test_results) > 0)