Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add job_id/project_id to adapter response to enable easy job linking (fixed) #250

Merged
merged 1 commit into from
Aug 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changes/unreleased/Under the Hood-20220806-142912.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Under the Hood
body: Add location/job_id/project_id to adapter response to enable easy job linking
time: 2022-08-06T14:29:12.271054+02:00
custom:
Author: Kayrnt
Issue: "92"
PR: "250"
56 changes: 40 additions & 16 deletions dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ class BigQueryConnectionMethod(StrEnum):
@dataclass
class BigQueryAdapterResponse(AdapterResponse):
bytes_processed: Optional[int] = None
location: Optional[str] = None
project_id: Optional[str] = None
job_id: Optional[str] = None


@dataclass
Expand Down Expand Up @@ -188,6 +191,12 @@ class BigQueryConnectionManager(BaseConnectionManager):
@classmethod
def handle_error(cls, error, message):
error_msg = "\n".join([item["message"] for item in error.errors])
if hasattr(error, "query_job"):
logger.error(
cls._bq_job_link(
error.query_job.location, error.query_job.project, error.query_job.job_id
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous PR had error.query_job.project_id instead of error.query_job.project which was a typo on my end
https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob#google_cloud_bigquery_job_QueryJob_project

)
)
raise DatabaseException(error_msg)

def clear_transaction(self):
Expand Down Expand Up @@ -446,55 +455,70 @@ def execute(
code = None
num_rows = None
bytes_processed = None
location = None
job_id = None
project_id = None
num_rows_formatted = None
processed_bytes = None

if query_job.statement_type == "CREATE_VIEW":
code = "CREATE VIEW"

elif query_job.statement_type == "CREATE_TABLE_AS_SELECT":
code = "CREATE TABLE"
conn = self.get_thread_connection()
client = conn.handle
query_table = client.get_table(query_job.destination)
code = "CREATE TABLE"
num_rows = query_table.num_rows
num_rows_formated = self.format_rows_number(num_rows)
bytes_processed = query_job.total_bytes_processed
processed_bytes = self.format_bytes(bytes_processed)
message = f"{code} ({num_rows_formated} rows, {processed_bytes} processed)"

elif query_job.statement_type == "SCRIPT":
code = "SCRIPT"
bytes_processed = query_job.total_bytes_processed
message = f"{code} ({self.format_bytes(bytes_processed)} processed)"

elif query_job.statement_type in ["INSERT", "DELETE", "MERGE", "UPDATE"]:
code = query_job.statement_type
num_rows = query_job.num_dml_affected_rows
num_rows_formated = self.format_rows_number(num_rows)
bytes_processed = query_job.total_bytes_processed
processed_bytes = self.format_bytes(bytes_processed)
message = f"{code} ({num_rows_formated} rows, {processed_bytes} processed)"

elif query_job.statement_type == "SELECT":
code = "SELECT"
conn = self.get_thread_connection()
client = conn.handle
# use anonymous table for num_rows
query_table = client.get_table(query_job.destination)
code = "SELECT"
num_rows = query_table.num_rows
num_rows_formated = self.format_rows_number(num_rows)
bytes_processed = query_job.total_bytes_processed
processed_bytes = self.format_bytes(bytes_processed)
message = f"{code} ({num_rows_formated} rows, {processed_bytes} processed)"

# set common attributes
bytes_processed = query_job.total_bytes_processed
processed_bytes = self.format_bytes(bytes_processed)
location = query_job.location
job_id = query_job.job_id
project_id = query_job.project
if num_rows is not None:
num_rows_formatted = self.format_rows_number(num_rows)
message = f"{code} ({num_rows_formatted} rows, {processed_bytes} processed)"
elif bytes_processed is not None:
message = f"{code} ({processed_bytes} processed)"
else:
message = f"{code}"

if location is not None and job_id is not None and project_id is not None:
logger.debug(self._bq_job_link(job_id, project_id, location))

response = BigQueryAdapterResponse( # type: ignore[call-arg]
_message=message,
rows_affected=num_rows,
code=code,
bytes_processed=bytes_processed,
location=location,
project_id=project_id,
job_id=job_id,
)

return response, table

@staticmethod
def _bq_job_link(location, project_id, job_id) -> str:
return f"https://console.cloud.google.com/bigquery?project={project_id}&j=bq:{location}:{job_id}&page=queryresults"

def get_partitions_metadata(self, table):
def standard_to_legacy(table):
return table.project + ":" + table.dataset + "." + table.identifier
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ def test__bigquery_adapter_functions(self):
results = self.run_dbt()
self.assertEqual(len(results), 3)

for result in results:
# all queries in adapter models are jobs that are expected to have a location/project_id/job_id
assert result.adapter_response["location"] is not None
assert result.adapter_response["project_id"] is not None
assert result.adapter_response["job_id"] is not None

test_results = self.run_dbt(['test'])

self.assertTrue(len(test_results) > 0)
Expand Down