From 7d894925e61af159aa0ff0bd20605bb70d52add8 Mon Sep 17 00:00:00 2001 From: Mayuri N Date: Mon, 9 Sep 2024 15:42:01 +0530 Subject: [PATCH 1/2] fix(ingest/bq): fix ordering of queries for use_queries_v2 --- .../source/bigquery_v2/queries_extractor.py | 38 ++++++++++++++----- ...ineage_via_temp_table_disordered_add.json} | 0 .../unit/sql_parsing/test_sql_aggregator.py | 7 +++- 3 files changed, 34 insertions(+), 11 deletions(-) rename metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/{test_lineage_via_temp_table_disordered_add.json => test_table_lineage_via_temp_table_disordered_add.json} (100%) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py index 2719d8b95bea83..7b42efe621c166 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py @@ -249,14 +249,28 @@ def get_workunits_internal( with self.report.audit_log_preprocessing_timer: # Preprocessing stage that deduplicates the queries using query hash per usage bucket - queries_deduped: FileBackedDict[Dict[int, ObservedQuery]] + # Using regular dictionary with + # key: usage bucket + # value: File backed dictionary with query hash as key and observed query as value + # This structure is chosen in order to maintain order of execution of queries + + queries_deduped: Dict[int, FileBackedDict[ObservedQuery]] queries_deduped = self.deduplicate_queries(queries) - self.report.num_unique_queries = len(queries_deduped) + self.report.num_unique_queries = len( + set( + query_hash + for bucket in queries_deduped.values() + for query_hash in bucket + ) + ) with self.report.audit_log_load_timer: i = 0 - for query_instances in queries_deduped.values(): - for _, query in query_instances.items(): + for queries_in_bucket in queries_deduped.values(): + # Assuming that FileBackedDict is an OrderedDict. + # If this is not the case, we would need to order it by "last_query_timestamp" + # using queries_in_bucket.sql_query_iterator. + for query in queries_in_bucket.values(): if i > 0 and i % 10000 == 0: logger.info(f"Added {i} query log entries to SQL aggregator") @@ -267,7 +281,7 @@ def get_workunits_internal( def deduplicate_queries( self, queries: FileBackedList[ObservedQuery] - ) -> FileBackedDict[Dict[int, ObservedQuery]]: + ) -> Dict[int, FileBackedDict[ObservedQuery]]: # This fingerprint based deduplication is done here to reduce performance hit due to # repetitive sql parsing while adding observed query to aggregator that would otherwise @@ -275,7 +289,7 @@ def deduplicate_queries( # With current implementation, it is possible that "Operation"(e.g. INSERT) is reported # only once per day, although it may have happened multiple times throughout the day. - queries_deduped: FileBackedDict[Dict[int, ObservedQuery]] = FileBackedDict() + queries_deduped: Dict[int, FileBackedDict[ObservedQuery]] = dict() for i, query in enumerate(queries): if i > 0 and i % 10000 == 0: @@ -294,14 +308,20 @@ def deduplicate_queries( query.query, self.identifiers.platform, fast=True ) - query_instances = queries_deduped.setdefault(query.query_hash, {}) + if time_bucket not in queries_deduped: + # TODO: Cleanup, etc as required for file backed dicts after use + queries_deduped[time_bucket] = FileBackedDict[ObservedQuery]( + extra_columns={"last_query_timestamp": lambda e: e.timestamp} + ) - observed_query = query_instances.setdefault(time_bucket, query) + observed_query = queries_deduped[time_bucket].get(query.query_hash) # If the query already exists for this time bucket, update its attributes - if observed_query is not query: + if observed_query is not None: observed_query.usage_multiplier += 1 observed_query.timestamp = query.timestamp + else: + queries_deduped[time_bucket][query.query_hash] = query return queries_deduped diff --git a/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_lineage_via_temp_table_disordered_add.json b/metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_lineage_via_temp_table_disordered_add.json similarity index 100% rename from metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_lineage_via_temp_table_disordered_add.json rename to metadata-ingestion/tests/unit/sql_parsing/aggregator_goldens/test_table_lineage_via_temp_table_disordered_add.json diff --git a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py index 2e15dabb10d114..f3b809d516e7d6 100644 --- a/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py +++ b/metadata-ingestion/tests/unit/sql_parsing/test_sql_aggregator.py @@ -530,7 +530,9 @@ def test_create_table_query_mcps(pytestconfig: pytest.Config) -> None: @freeze_time(FROZEN_TIME) -def test_lineage_via_temp_table_disordered_add(pytestconfig: pytest.Config) -> None: +def test_table_lineage_via_temp_table_disordered_add( + pytestconfig: pytest.Config, +) -> None: aggregator = SqlParsingAggregator( platform="redshift", generate_lineage=True, @@ -554,7 +556,8 @@ def test_lineage_via_temp_table_disordered_add(pytestconfig: pytest.Config) -> N mce_helpers.check_goldens_stream( pytestconfig, outputs=mcps, - golden_path=RESOURCE_DIR / "test_lineage_via_temp_table_disordered_add.json", + golden_path=RESOURCE_DIR + / "test_table_lineage_via_temp_table_disordered_add.json", ) From cf1077745c1e9f97cc0d483adf60f37f4bf7189b Mon Sep 17 00:00:00 2001 From: Mayuri N Date: Tue, 10 Sep 2024 12:48:38 +0530 Subject: [PATCH 2/2] use order by sql --- .../ingestion/source/bigquery_v2/queries_extractor.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py index 7b42efe621c166..f5555f37429d46 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/queries_extractor.py @@ -267,10 +267,11 @@ def get_workunits_internal( with self.report.audit_log_load_timer: i = 0 for queries_in_bucket in queries_deduped.values(): - # Assuming that FileBackedDict is an OrderedDict. - # If this is not the case, we would need to order it by "last_query_timestamp" - # using queries_in_bucket.sql_query_iterator. - for query in queries_in_bucket.values(): + # Ordering is essential for column-level lineage via temporary table + for row in queries_in_bucket.sql_query_iterator( + "select value from data order by last_query_timestamp asc", + ): + query = queries_in_bucket.deserializer(row["value"]) if i > 0 and i % 10000 == 0: logger.info(f"Added {i} query log entries to SQL aggregator")