diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py index 2719d8b95bea83..f5555f37429d46 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py @@ -249,14 +249,29 @@ def get_workunits_internal( with self.report.audit_log_preprocessing_timer: # Preprocessing stage that deduplicates the queries using query hash per usage bucket - queries_deduped: FileBackedDict[Dict[int, ObservedQuery]] + # Using regular dictionary with + # key: usage bucket + # value: File backed dictionary with query hash as key and observed query as value + # This structure is chosen in order to maintain order of execution of queries + + queries_deduped: Dict[int, FileBackedDict[ObservedQuery]] queries_deduped = self.deduplicate_queries(queries) - self.report.num_unique_queries = len(queries_deduped) + self.report.num_unique_queries = len( + set( + query_hash + for bucket in queries_deduped.values() + for query_hash in bucket + ) + ) with self.report.audit_log_load_timer: i = 0 - for query_instances in queries_deduped.values(): - for _, query in query_instances.items(): + for queries_in_bucket in queries_deduped.values(): + # Ordering is essential for column-level lineage via temporary table + for row in queries_in_bucket.sql_query_iterator( + "select value from data order by last_query_timestamp asc", + ): + query = queries_in_bucket.deserializer(row["value"]) if i > 0 and i % 10000 == 0: logger.info(f"Added {i} query log entries to SQL aggregator") @@ -267,7 +282,7 @@ def get_workunits_internal( def deduplicate_queries( self, queries: FileBackedList[ObservedQuery] - ) -> FileBackedDict[Dict[int, ObservedQuery]]: + ) -> Dict[int, FileBackedDict[ObservedQuery]]: # This fingerprint based deduplication is done here to reduce performance hit due to # repetitive sql parsing while adding observed query to aggregator that would otherwise @@ -275,7 +290,7 @@ def deduplicate_queries( # With current implementation, it is possible that "Operation"(e.g. INSERT) is reported # only once per day, although it may have happened multiple times throughout the day. - queries_deduped: FileBackedDict[Dict[int, ObservedQuery]] = FileBackedDict() + queries_deduped: Dict[int, FileBackedDict[ObservedQuery]] = dict() for i, query in enumerate(queries): if i > 0 and i % 10000 == 0: @@ -294,14 +309,20 @@ def deduplicate_queries( query.query, self.identifiers.platform, fast=True ) - query_instances = queries_deduped.setdefault(query.query_hash, {}) + if time_bucket not in queries_deduped: + # TODO: Cleanup, etc as required for file backed dicts after use + queries_deduped[time_bucket] = FileBackedDict[ObservedQuery]( + extra_columns={"last_query_timestamp": lambda e: e.timestamp} + ) - observed_query = query_instances.setdefault(time_bucket, query) + observed_query = queries_deduped[time_bucket].get(query.query_hash) # If the query already exists for this time bucket, update its attributes - if observed_query is not query: + if observed_query is not None: observed_query.usage_multiplier += 1 observed_query.timestamp = query.timestamp + else: + queries_deduped[time_bucket][query.query_hash] = query return queries_deduped diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_lineage_via_temp_table_disordered_add.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_lineage_via_temp_table_disordered_add.json similarity index 100% rename from metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_lineage_via_temp_table_disordered_add.json rename to metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_lineage_via_temp_table_disordered_add.json diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py index 2e15dabb10d114..f3b809d516e7d6 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py @@ -530,7 +530,9 @@ def test_create_table_query_mcps(pytestconfig: pytest.Config) -> None: @freeze_time(FROZEN_TIME) -def test_lineage_via_temp_table_disordered_add(pytestconfig: pytest.Config) -> None: +def test_table_lineage_via_temp_table_disordered_add( + pytestconfig: pytest.Config, +) -> None: aggregator = SqlParsingAggregator( platform="redshift", generate_lineage=True, @@ -554,7 +556,8 @@ def test_lineage_via_temp_table_disordered_add(pytestconfig: pytest.Config) -> N mce_helpers.check_goldens_stream( pytestconfig, outputs=mcps, - golden_path=RESOURCE_DIR / "test_lineage_via_temp_table_disordered_add.json", + golden_path=RESOURCE_DIR + / "test_table_lineage_via_temp_table_disordered_add.json", )