From 6de5baa12fb8d376ead9a919c27bb06fdf6ffb1a Mon Sep 17 00:00:00 2001 From: Mayuri N Date: Thu, 26 Sep 2024 14:58:22 +0530 Subject: [PATCH] feat(ingest/databricks): add usage perf report --- .../datahub/ingestion/source/unity/report.py | 14 ++- .../datahub/ingestion/source/unity/usage.py | 109 ++++++++++-------- .../performance/databricks/test_unity.py | 7 +- .../databricks/unity_proxy_mock.py | 12 +- 4 files changed, 87 insertions(+), 55 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/report.py b/metadata-ingestion/src/datahub/ingestion/source/unity/report.py index 02eedb67f4cc22..a00a52ae542076 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/report.py @@ -1,10 +1,19 @@ from dataclasses import dataclass, field from typing import Optional, Tuple -from datahub.ingestion.api.report import EntityFilterReport +from datahub.ingestion.api.report import EntityFilterReport, Report from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport from datahub.utilities.lossy_collections import LossyDict, LossyList +from datahub.utilities.perf_timer import PerfTimer + + +@dataclass +class UnityCatalogUsagePerfReport(Report): + get_queries_timer: PerfTimer = field(default_factory=PerfTimer) + sql_parsing_timer: PerfTimer = field(default_factory=PerfTimer) + aggregator_add_event_timer: PerfTimer = field(default_factory=PerfTimer) + gen_operation_timer: PerfTimer = field(default_factory=PerfTimer) @dataclass @@ -27,6 +36,9 @@ class UnityCatalogReport(IngestionStageReport, ProfilingSqlReport): num_queries_missing_table: int = 0 # Can be due to pattern filter num_queries_duplicate_table: int = 0 num_queries_parsed_by_spark_plan: int = 0 + usage_perf_report: UnityCatalogUsagePerfReport = field( + default_factory=UnityCatalogUsagePerfReport + ) # Distinguish from Operations emitted for created / updated timestamps num_operational_stats_workunits_emitted: int = 0 diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py b/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py index 5eec2ca587ead2..08482c9d2fa3b9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/usage.py @@ -81,20 +81,25 @@ def _get_workunits_internal( table_map[f"{ref.schema}.{ref.table}"].append(ref) table_map[ref.qualified_table_name].append(ref) - for query in self._get_queries(): - self.report.num_queries += 1 - table_info = self._parse_query(query, table_map) - if table_info is not None: - if self.config.include_operational_stats: - yield from self._generate_operation_workunit(query, table_info) - for source_table in table_info.source_tables: - self.usage_aggregator.aggregate_event( - resource=source_table, - start_time=query.start_time, - query=query.query_text, - user=query.user_name, - fields=[], - ) + with self.report.usage_perf_report.get_queries_timer as current_timer: + for query in self._get_queries(): + self.report.num_queries += 1 + with current_timer.pause(): + table_info = self._parse_query(query, table_map) + if table_info is not None: + if self.config.include_operational_stats: + yield from self._generate_operation_workunit( + query, table_info + ) + for source_table in table_info.source_tables: + with self.report.usage_perf_report.aggregator_add_event_timer: + self.usage_aggregator.aggregate_event( + resource=source_table, + start_time=query.start_time, + query=query.query_text, + user=query.user_name, + fields=[], + ) if not self.report.num_queries: logger.warning("No queries found in the given time range.") @@ -117,29 +122,34 @@ def _get_workunits_internal( def _generate_operation_workunit( self, query: Query, table_info: QueryTableInfo ) -> Iterable[MetadataWorkUnit]: - if ( - not query.statement_type - or query.statement_type not in OPERATION_STATEMENT_TYPES - ): - return None + with self.report.usage_perf_report.gen_operation_timer: + if ( + not query.statement_type + or query.statement_type not in OPERATION_STATEMENT_TYPES + ): + return None - # Not sure about behavior when there are multiple target tables. This is a best attempt. - for target_table in table_info.target_tables: - operation_aspect = OperationClass( - timestampMillis=int(time.time() * 1000), - lastUpdatedTimestamp=int(query.end_time.timestamp() * 1000), - actor=( - self.user_urn_builder(query.user_name) if query.user_name else None - ), - operationType=OPERATION_STATEMENT_TYPES[query.statement_type], - affectedDatasets=[ - self.table_urn_builder(table) for table in table_info.source_tables - ], - ) - self.report.num_operational_stats_workunits_emitted += 1 - yield MetadataChangeProposalWrapper( - entityUrn=self.table_urn_builder(target_table), aspect=operation_aspect - ).as_workunit() + # Not sure about behavior when there are multiple target tables. This is a best attempt. + for target_table in table_info.target_tables: + operation_aspect = OperationClass( + timestampMillis=int(time.time() * 1000), + lastUpdatedTimestamp=int(query.end_time.timestamp() * 1000), + actor=( + self.user_urn_builder(query.user_name) + if query.user_name + else None + ), + operationType=OPERATION_STATEMENT_TYPES[query.statement_type], + affectedDatasets=[ + self.table_urn_builder(table) + for table in table_info.source_tables + ], + ) + self.report.num_operational_stats_workunits_emitted += 1 + yield MetadataChangeProposalWrapper( + entityUrn=self.table_urn_builder(target_table), + aspect=operation_aspect, + ).as_workunit() def _get_queries(self) -> Iterable[Query]: try: @@ -153,18 +163,23 @@ def _get_queries(self) -> Iterable[Query]: def _parse_query( self, query: Query, table_map: TableMap ) -> Optional[QueryTableInfo]: - table_info = self._parse_query_via_lineage_runner(query.query_text) - if table_info is None and query.statement_type == QueryStatementType.SELECT: - table_info = self._parse_query_via_spark_sql_plan(query.query_text) + with self.report.usage_perf_report.sql_parsing_timer: + table_info = self._parse_query_via_lineage_runner(query.query_text) + if table_info is None and query.statement_type == QueryStatementType.SELECT: + table_info = self._parse_query_via_spark_sql_plan(query.query_text) - if table_info is None: - self.report.num_queries_dropped_parse_failure += 1 - return None - else: - return QueryTableInfo( - source_tables=self._resolve_tables(table_info.source_tables, table_map), - target_tables=self._resolve_tables(table_info.target_tables, table_map), - ) + if table_info is None: + self.report.num_queries_dropped_parse_failure += 1 + return None + else: + return QueryTableInfo( + source_tables=self._resolve_tables( + table_info.source_tables, table_map + ), + target_tables=self._resolve_tables( + table_info.target_tables, table_map + ), + ) def _parse_query_via_lineage_runner(self, query: str) -> Optional[StringTableInfo]: try: diff --git a/metadata-ingestion/tests/performance/databricks/test_unity.py b/metadata-ingestion/tests/performance/databricks/test_unity.py index 6592ffe5198c16..ddd19804ba1841 100644 --- a/metadata-ingestion/tests/performance/databricks/test_unity.py +++ b/metadata-ingestion/tests/performance/databricks/test_unity.py @@ -40,7 +40,10 @@ def run_test(): print("Data generated") config = UnityCatalogSourceConfig( - token="", workspace_url="http://localhost:1234", include_usage_statistics=False + token="", + workspace_url="http://localhost:1234", + include_usage_statistics=True, + include_hive_metastore=False, ) ctx = PipelineContext(run_id="test") with patch( @@ -61,7 +64,7 @@ def run_test(): print( f"Peak Memory Used: {humanfriendly.format_size(peak_memory_usage - pre_mem_usage)}" ) - print(source.report.aspects) + print(source.report.as_string()) if __name__ == "__main__": diff --git a/metadata-ingestion/tests/performance/databricks/unity_proxy_mock.py b/metadata-ingestion/tests/performance/databricks/unity_proxy_mock.py index cb3a1c165acdd4..307a7ba71ef839 100644 --- a/metadata-ingestion/tests/performance/databricks/unity_proxy_mock.py +++ b/metadata-ingestion/tests/performance/databricks/unity_proxy_mock.py @@ -1,7 +1,7 @@ import uuid from collections import defaultdict from datetime import datetime, timezone -from typing import Dict, Iterable, List +from typing import Dict, Iterable, List, Optional from databricks.sdk.service.catalog import ColumnTypeName from databricks.sdk.service.sql import QueryStatementType @@ -57,13 +57,15 @@ def assigned_metastore(self) -> Metastore: region=None, ) - def catalogs(self, metastore: Metastore) -> Iterable[Catalog]: + def catalogs(self, metastore: Optional[Metastore]) -> Iterable[Catalog]: for container in self.seed_metadata.containers[1]: - if not container.parent or metastore.name != container.parent.name: + if not container.parent or ( + metastore and metastore.name != container.parent.name + ): continue yield Catalog( - id=f"{metastore.id}.{container.name}", + id=f"{metastore.id}.{container.name}" if metastore else container.name, name=container.name, metastore=metastore, comment=None, @@ -153,7 +155,7 @@ def query_history( executed_as_user_name=None, ) - def table_lineage(self, table: Table) -> None: + def table_lineage(self, table: Table, include_entity_lineage: bool) -> None: pass def get_column_lineage(self, table: Table) -> None: