diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py index 30777e2d675d48..f3aa1d126ce6b0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py @@ -4,7 +4,7 @@ import tempfile from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import Dict, Iterable, List, MutableMapping, Optional, TypedDict +from typing import Dict, Iterable, List, Optional, TypedDict from google.cloud.bigquery import Client from pydantic import Field @@ -109,7 +109,8 @@ class BigQueryQueriesExtractorConfig(BigQueryBaseConfig): region_qualifiers: List[str] = Field( default=["region-us", "region-eu"], - description="BigQuery regions to be scanned for bigquery jobs. See [this](https://cloud.google.com/bigquery/docs/information-schema-jobs) for details.", + description="BigQuery regions to be scanned for bigquery jobs. " + "See [this](https://cloud.google.com/bigquery/docs/information-schema-jobs#scope_and_syntax) for details.", ) @@ -234,23 +235,22 @@ def get_workunits_internal( for entry in self.fetch_query_log(project): self.report.num_queries_by_project[project.id] += 1 queries.append(entry) - self.report.num_total_queries = len(queries) + self.report.num_total_queries = len(queries) with self.report.audit_log_preprocessing_timer: # Preprocessing stage that deduplicates the queries using query hash per usage bucket - queries_deduped: MutableMapping[str, Dict[int, ObservedQuery]] + queries_deduped: FileBackedDict[Dict[int, ObservedQuery]] queries_deduped = self.deduplicate_queries(queries) self.report.num_unique_queries = len(queries_deduped) with self.report.audit_log_load_timer: i = 0 - # Is FileBackedDict OrderedDict ? i.e. keys / values are retrieved in same order as added ? - # Does aggregator expect to see queries in same order as they were executed ? for query_instances in queries_deduped.values(): for _, query in query_instances.items(): - if i > 0 and i % 1000 == 0: + if i > 0 and i % 10000 == 0: logger.info(f"Added {i} query log entries to SQL aggregator") + logger.info(f"{query.query_hash}, {query.timestamp}") self.aggregator.add(query) i += 1 @@ -258,31 +258,41 @@ def get_workunits_internal( def deduplicate_queries( self, queries: FileBackedList[ObservedQuery] - ) -> MutableMapping[str, Dict[int, ObservedQuery]]: + ) -> FileBackedDict[Dict[int, ObservedQuery]]: + + # This fingerprint based deduplication is done here to reduce performance hit due to + # repetitive sql parsing while adding observed query to aggregator that would otherwise + # parse same query multiple times. In future, aggregator may absorb this deduplication. + # With current implementation, it is possible that "Operation"(e.g. INSERT) is reported + # only once per day, although it may have happened multiple times throughout the day. + queries_deduped: FileBackedDict[Dict[int, ObservedQuery]] = FileBackedDict() - for query in queries: - time_bucket = ( - datetime_to_ts_millis( + + for i, query in enumerate(queries): + if i > 0 and i % 10000 == 0: + logger.info(f"Preprocessing completed for {i} query log entries") + + # query = ObservedQuery(**asdict(query)) + + time_bucket = 0 + if query.timestamp: + time_bucket = datetime_to_ts_millis( get_time_bucket(query.timestamp, self.config.window.bucket_duration) ) - if query.timestamp - else 0 - ) - query_hash = get_query_fingerprint( + + # Not using original BQ query hash as it's not always present + query.query_hash = get_query_fingerprint( query.query, self.identifiers.platform, fast=True ) - query.query_hash = query_hash - if query_hash not in queries_deduped: - queries_deduped[query_hash] = {time_bucket: query} - else: - seen_query = queries_deduped[query_hash] - if time_bucket not in seen_query: - seen_query[time_bucket] = query - else: - observed_query = seen_query[time_bucket] - observed_query.usage_multiplier += 1 - observed_query.timestamp = query.timestamp - queries_deduped[query_hash] = seen_query + + query_instances = queries_deduped.setdefault(query.query_hash, {}) + + observed_query = query_instances.setdefault(time_bucket, query) + + # If the query already exists for this time bucket, update its attributes + if observed_query is not query: + observed_query.usage_multiplier += 1 + observed_query.timestamp = query.timestamp return queries_deduped @@ -292,55 +302,60 @@ def fetch_query_log(self, project: BigqueryProject) -> Iterable[ObservedQuery]: regions = self.config.region_qualifiers for region in regions: - # Each region needs to be a different query - query_log_query = _build_enriched_query_log_query( - project_id=project.id, - region=region, - start_time=self.config.window.start_time, - end_time=self.config.window.end_time, - ) - with self.structured_report.report_exc( - f"Error fetching query log from BigQuery Project {project.id} Region {region}" + f"Error fetching query log from BQ Project {project.id} for {region}" ): - logger.info( - f"Fetching query log from BigQuery Project {project.id} Region {region}" + yield from self.fetch_region_query_log(project, region) + + def fetch_region_query_log( + self, project: BigqueryProject, region: str + ) -> Iterable[ObservedQuery]: + + # Each region needs to be a different query + query_log_query = _build_enriched_query_log_query( + project_id=project.id, + region=region, + start_time=self.config.window.start_time, + end_time=self.config.window.end_time, + ) + + logger.info(f"Fetching query log from BQ Project {project.id} for {region}") + resp = self.connection.query(query_log_query) + + for i, row in enumerate(resp): + if i > 0 and i % 1000 == 0: + logger.info(f"Processed {i} query log rows so far") + try: + entry = self._parse_audit_log_row(row) + except Exception as e: + self.structured_report.warning( + "Error parsing query log row", + context=f"{row}", + exc=e, ) - resp = self.connection.query(query_log_query) - - for i, row in enumerate(resp): - if i > 0 and i % 1000 == 0: - logger.info(f"Processed {i} query log rows so far") - - try: - entry = self._parse_audit_log_row(row) - except Exception as e: - self.structured_report.warning( - "Error parsing query log row", - context=f"{row}", - exc=e, - ) - else: - yield entry + else: + yield entry def _parse_audit_log_row(self, row: BigQueryJob) -> ObservedQuery: timestamp: datetime = row["creation_time"] timestamp = timestamp.astimezone(timezone.utc) + # https://cloud.google.com/bigquery/docs/multi-statement-queries + # Also _ at start considers this as temp dataset as per `temp_table_dataset_prefix` config + TEMP_TABLE_QUALIFIER = "_SESSION" + entry = ObservedQuery( query=row["query"], session_id=row["session_id"], timestamp=row["creation_time"], - # TODO: Move user urn generation to BigQueryIdentifierBuilder user=( self.identifiers.gen_user_urn(row["user_email"]) if row["user_email"] else None ), default_db=row["project_id"], - default_schema=None, - # Not using BQ query hash as it's not always present - # query_hash=row["query_hash"], + default_schema=TEMP_TABLE_QUALIFIER, + query_hash=row["query_hash"], ) return entry @@ -356,26 +371,43 @@ def _build_enriched_query_log_query( audit_start_time = start_time.strftime(BQ_DATETIME_FORMAT) audit_end_time = end_time.strftime(BQ_DATETIME_FORMAT) - # NOTE the use of creation_time as timestamp here - # as JOBS table is partitioned by creation_time. - # Using this column filter significantly reduces processed bytes. + # List of all statement types + # https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/BigQueryAuditMetadata.QueryStatementType + unsupported_statement_types = ",".join( + [ + f"'{statement_type}'" + for statement_type in [ + # procedure + "CREATE_PROCEDURE", + "DROP_PROCEDURE", + "CALL", + # schema + "CREATE_SCHEMA", + "DROP_SCHEMA", + # function + "CREATE_FUNCTION", + "CREATE_TABLE_FUNCTION", + "DROP_FUNCTION", + # policies + "CREATE_ROW_ACCESS_POLICY", + "DROP_ROW_ACCESS_POLICY", + ] + ] + ) + + # NOTE the use of partition column creation_time as timestamp here. + # Currently, only required columns are fetched. There are more columns such as + # total_slot_ms, statement_type, job_type, destination_table, referenced_tables, + # total_bytes_billed, dml_statistics(inserted_row_count, etc) that may be fetched + # as required in future. Refer below link for list of all columns + # https://cloud.google.com/bigquery/docs/information-schema-jobs#schema return f""" SELECT job_id, project_id, creation_time, - start_time, - end_time, - total_slot_ms, user_email, - statement_type, - job_type, query, - destination_table, - referenced_tables, - total_bytes_billed, - total_bytes_processed, - dml_statistics, session_info.session_id as session_id, query_info.query_hashes.normalized_literals as query_hash FROM @@ -384,6 +416,7 @@ def _build_enriched_query_log_query( creation_time >= '{audit_start_time}' AND creation_time <= '{audit_end_time}' AND error_result is null AND - not CONTAINS_SUBSTR(query, '.INFORMATION_SCHEMA.') + not CONTAINS_SUBSTR(query, '.INFORMATION_SCHEMA.') AND + statement_type not in ({unsupported_statement_types}) ORDER BY creation_time """ diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_create_table_query_mcps.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_create_table_query_mcps.json new file mode 100644 index 00000000000000..6a5cf51dd75226 --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_create_table_query_mcps.json @@ -0,0 +1,22 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:bigquery,dev.dataset.foo,PROD)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1707182625000, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "operationType": "CREATE", + "customProperties": { + "query_urn": "urn:li:query:f2e61c641cf14eae74147b6280ae40648516c4b7b58cfca6c4f7fb14ab255ce2" + }, + "lastUpdatedTimestamp": 1707182625000 + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_lineage_via_temp_table_disordered_add.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_lineage_via_temp_table_disordered_add.json new file mode 100644 index 00000000000000..51a732e7a24940 --- /dev/null +++ b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_lineage_via_temp_table_disordered_add.json @@ -0,0 +1,79 @@ +[ +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.derived_from_foo,PROD)", + "changeType": "UPSERT", + "aspectName": "upstreamLineage", + "aspect": { + "json": { + "upstreams": [ + { + "auditStamp": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + }, + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "dataset": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)", + "type": "TRANSFORMED", + "query": "urn:li:query:composite_39f4adf89c8ad4d6d307b628c82d8260e1c5cd7eb6fb3a8cbb437421f970c16f" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:composite_39f4adf89c8ad4d6d307b628c82d8260e1c5cd7eb6fb3a8cbb437421f970c16f", + "changeType": "UPSERT", + "aspectName": "queryProperties", + "aspect": { + "json": { + "statement": { + "value": "CREATE TEMPORARY TABLE foo AS\nSELECT\n a,\n b + c AS c\nFROM bar;\n\nCREATE TABLE derived_from_foo AS\nSELECT\n *\nFROM foo", + "language": "SQL" + }, + "source": "SYSTEM", + "created": { + "time": 0, + "actor": "urn:li:corpuser:_ingestion" + }, + "lastModified": { + "time": 1707182625000, + "actor": "urn:li:corpuser:_ingestion" + } + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:composite_39f4adf89c8ad4d6d307b628c82d8260e1c5cd7eb6fb3a8cbb437421f970c16f", + "changeType": "UPSERT", + "aspectName": "querySubjects", + "aspect": { + "json": { + "subjects": [ + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.bar,PROD)" + }, + { + "entity": "urn:li:dataset:(urn:li:dataPlatform:redshift,dev.public.derived_from_foo,PROD)" + } + ] + } + } +}, +{ + "entityType": "query", + "entityUrn": "urn:li:query:composite_39f4adf89c8ad4d6d307b628c82d8260e1c5cd7eb6fb3a8cbb437421f970c16f", + "changeType": "UPSERT", + "aspectName": "dataPlatformInstance", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:redshift" + } + } +} +] \ No newline at end of file diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py index 0655ef5842441f..823de771fba2ea 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py @@ -524,3 +524,36 @@ def test_create_table_query_mcps(pytestconfig: pytest.Config) -> None: outputs=mcps, golden_path=RESOURCE_DIR / "test_create_table_query_mcps.json", ) + + +@freeze_time(FROZEN_TIME) +def test_lineage_via_temp_table_disordered_add(pytestconfig: pytest.Config) -> None: + aggregator = SqlParsingAggregator( + platform="redshift", + generate_lineage=True, + generate_usage_statistics=False, + generate_operations=False, + ) + + aggregator._schema_resolver.add_raw_schema_info( + DatasetUrn("redshift", "dev.public.bar").urn(), + {"a": "int", "b": "int", "c": "int"}, + ) + aggregator.add_observed_query( + query="create table derived_from_foo as select * from foo", + default_db="dev", + default_schema="public", + ) + aggregator.add_observed_query( + query="create temp table foo as select a, b+c as c from bar", + default_db="dev", + default_schema="public", + ) + + mcps = list(aggregator.gen_metadata()) + + mce_helpers.check_goldens_stream( + pytestconfig, + outputs=mcps, + golden_path=RESOURCE_DIR / "test_lineage_via_temp_table_disordered_add.json", + )