From 39e88ef16d98db1326b3821e93cb941406852829 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 10 Jan 2024 03:56:29 -0500 Subject: [PATCH] fix(ingest/bigquery): support google-cloud-bigquery 3.15.0 (#9595) --- .../src/datahub/emitter/sql_parsing_builder.py | 2 +- .../src/datahub/ingestion/run/pipeline.py | 4 ++++ .../src/datahub/ingestion/source/ge_data_profiler.py | 10 +++++++--- .../ingestion/source/snowflake/snowflake_lineage_v2.py | 4 ++-- 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py b/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py index ea5ebf705707a1..046b615bd4e9fa 100644 --- a/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py +++ b/metadata-ingestion/src/datahub/emitter/sql_parsing_builder.py @@ -92,7 +92,7 @@ class SqlParsingBuilder: def __post_init__(self) -> None: if self.usage_config: self._usage_aggregator = UsageAggregator(self.usage_config) - else: + elif self.generate_usage_statistics: logger.info("No usage config provided, not generating usage statistics") self.generate_usage_statistics = False diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index 1641d71aba1996..70ff6992645e71 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -137,6 +137,7 @@ class CliReport(Report): disk_info: Optional[dict] = None peak_disk_usage: Optional[str] = None + _initial_disk_usage: int = -1 _peak_disk_usage: int = 0 thread_count: Optional[int] = None @@ -156,12 +157,15 @@ def compute_stats(self) -> None: try: disk_usage = shutil.disk_usage("/") + if self._initial_disk_usage < 0: + self._initial_disk_usage = disk_usage.used if self._peak_disk_usage < disk_usage.used: self._peak_disk_usage = disk_usage.used self.peak_disk_usage = humanfriendly.format_size(self._peak_disk_usage) self.disk_info = { "total": humanfriendly.format_size(disk_usage.total), "used": humanfriendly.format_size(disk_usage.used), + "used_initally": humanfriendly.format_size(self._initial_disk_usage), "free": humanfriendly.format_size(disk_usage.free), } except Exception as e: diff --git a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py index abb415c90cc8b0..4f1ad00b1e425d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ge_data_profiler.py @@ -1283,9 +1283,13 @@ def create_bigquery_temp_table( # temporary table dance. However, that would require either a) upgrading to # use GE's batch v3 API or b) bypassing GE altogether. - query_job: Optional[ - "google.cloud.bigquery.job.query.QueryJob" - ] = cursor._query_job + query_job: Optional["google.cloud.bigquery.job.query.QueryJob"] = ( + # In google-cloud-bigquery 3.15.0, the _query_job attribute was + # made public and renamed to query_job. + cursor.query_job + if hasattr(cursor, "query_job") + else cursor._query_job # type: ignore[attr-defined] + ) assert query_job temp_destination_table = query_job.destination bigquery_temp_table = f"{temp_destination_table.project}.{temp_destination_table.dataset_id}.{temp_destination_table.table_id}" diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py index 4219533dc217ca..142dbbf12f010c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_lineage_v2.py @@ -353,13 +353,13 @@ def _populate_external_lineage_map(self, discovered_tables: List[str]) -> None: self._populate_external_lineage_from_copy_history(discovered_tables) logger.info( - "Done populating external lineage from copy history." + "Done populating external lineage from copy history. " f"Found {self.report.num_external_table_edges_scanned} external lineage edges so far." ) self._populate_external_lineage_from_show_query(discovered_tables) logger.info( - "Done populating external lineage from show external tables." + "Done populating external lineage from show external tables. " f"Found {self.report.num_external_table_edges_scanned} external lineage edges so far." )