Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/databricks): add usage perf report #11480

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/unity/report.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
from dataclasses import dataclass, field
from typing import Optional, Tuple

from datahub.ingestion.api.report import EntityFilterReport
from datahub.ingestion.api.report import EntityFilterReport, Report
from datahub.ingestion.source.sql.sql_generic_profiler import ProfilingSqlReport
from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport
from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.perf_timer import PerfTimer


@dataclass
class UnityCatalogUsagePerfReport(Report):
get_queries_timer: PerfTimer = field(default_factory=PerfTimer)
sql_parsing_timer: PerfTimer = field(default_factory=PerfTimer)
aggregator_add_event_timer: PerfTimer = field(default_factory=PerfTimer)
gen_operation_timer: PerfTimer = field(default_factory=PerfTimer)


@dataclass
Expand All @@ -27,6 +36,9 @@ class UnityCatalogReport(IngestionStageReport, ProfilingSqlReport):
num_queries_missing_table: int = 0 # Can be due to pattern filter
num_queries_duplicate_table: int = 0
num_queries_parsed_by_spark_plan: int = 0
usage_perf_report: UnityCatalogUsagePerfReport = field(
default_factory=UnityCatalogUsagePerfReport
)

# Distinguish from Operations emitted for created / updated timestamps
num_operational_stats_workunits_emitted: int = 0
Expand Down
109 changes: 62 additions & 47 deletions metadata-ingestion/src/datahub/ingestion/source/unity/usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,20 +81,25 @@ def _get_workunits_internal(
table_map[f"{ref.schema}.{ref.table}"].append(ref)
table_map[ref.qualified_table_name].append(ref)

for query in self._get_queries():
self.report.num_queries += 1
table_info = self._parse_query(query, table_map)
if table_info is not None:
if self.config.include_operational_stats:
yield from self._generate_operation_workunit(query, table_info)
for source_table in table_info.source_tables:
self.usage_aggregator.aggregate_event(
resource=source_table,
start_time=query.start_time,
query=query.query_text,
user=query.user_name,
fields=[],
)
with self.report.usage_perf_report.get_queries_timer as current_timer:
for query in self._get_queries():
self.report.num_queries += 1
with current_timer.pause():
table_info = self._parse_query(query, table_map)
if table_info is not None:
if self.config.include_operational_stats:
yield from self._generate_operation_workunit(
query, table_info
)
for source_table in table_info.source_tables:
with self.report.usage_perf_report.aggregator_add_event_timer:
self.usage_aggregator.aggregate_event(
resource=source_table,
start_time=query.start_time,
query=query.query_text,
user=query.user_name,
fields=[],
)

if not self.report.num_queries:
logger.warning("No queries found in the given time range.")
Expand All @@ -117,29 +122,34 @@ def _get_workunits_internal(
def _generate_operation_workunit(
self, query: Query, table_info: QueryTableInfo
) -> Iterable[MetadataWorkUnit]:
if (
not query.statement_type
or query.statement_type not in OPERATION_STATEMENT_TYPES
):
return None
with self.report.usage_perf_report.gen_operation_timer:
if (
not query.statement_type
or query.statement_type not in OPERATION_STATEMENT_TYPES
):
return None

# Not sure about behavior when there are multiple target tables. This is a best attempt.
for target_table in table_info.target_tables:
operation_aspect = OperationClass(
timestampMillis=int(time.time() * 1000),
lastUpdatedTimestamp=int(query.end_time.timestamp() * 1000),
actor=(
self.user_urn_builder(query.user_name) if query.user_name else None
),
operationType=OPERATION_STATEMENT_TYPES[query.statement_type],
affectedDatasets=[
self.table_urn_builder(table) for table in table_info.source_tables
],
)
self.report.num_operational_stats_workunits_emitted += 1
yield MetadataChangeProposalWrapper(
entityUrn=self.table_urn_builder(target_table), aspect=operation_aspect
).as_workunit()
# Not sure about behavior when there are multiple target tables. This is a best attempt.
for target_table in table_info.target_tables:
operation_aspect = OperationClass(
timestampMillis=int(time.time() * 1000),
lastUpdatedTimestamp=int(query.end_time.timestamp() * 1000),
actor=(
self.user_urn_builder(query.user_name)
if query.user_name
else None
),
operationType=OPERATION_STATEMENT_TYPES[query.statement_type],
affectedDatasets=[
self.table_urn_builder(table)
for table in table_info.source_tables
],
)
self.report.num_operational_stats_workunits_emitted += 1
yield MetadataChangeProposalWrapper(
entityUrn=self.table_urn_builder(target_table),
aspect=operation_aspect,
).as_workunit()

def _get_queries(self) -> Iterable[Query]:
try:
Expand All @@ -153,18 +163,23 @@ def _get_queries(self) -> Iterable[Query]:
def _parse_query(
self, query: Query, table_map: TableMap
) -> Optional[QueryTableInfo]:
table_info = self._parse_query_via_lineage_runner(query.query_text)
if table_info is None and query.statement_type == QueryStatementType.SELECT:
table_info = self._parse_query_via_spark_sql_plan(query.query_text)
with self.report.usage_perf_report.sql_parsing_timer:
table_info = self._parse_query_via_lineage_runner(query.query_text)
if table_info is None and query.statement_type == QueryStatementType.SELECT:
table_info = self._parse_query_via_spark_sql_plan(query.query_text)

if table_info is None:
self.report.num_queries_dropped_parse_failure += 1
return None
else:
return QueryTableInfo(
source_tables=self._resolve_tables(table_info.source_tables, table_map),
target_tables=self._resolve_tables(table_info.target_tables, table_map),
)
if table_info is None:
self.report.num_queries_dropped_parse_failure += 1
return None
else:
return QueryTableInfo(
source_tables=self._resolve_tables(
table_info.source_tables, table_map
),
target_tables=self._resolve_tables(
table_info.target_tables, table_map
),
)

def _parse_query_via_lineage_runner(self, query: str) -> Optional[StringTableInfo]:
try:
Expand Down
7 changes: 5 additions & 2 deletions metadata-ingestion/tests/performance/databricks/test_unity.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ def run_test():
print("Data generated")

config = UnityCatalogSourceConfig(
token="", workspace_url="http://localhost:1234", include_usage_statistics=False
token="",
workspace_url="http://localhost:1234",
include_usage_statistics=True,
include_hive_metastore=False,
)
ctx = PipelineContext(run_id="test")
with patch(
Expand All @@ -61,7 +64,7 @@ def run_test():
print(
f"Peak Memory Used: {humanfriendly.format_size(peak_memory_usage - pre_mem_usage)}"
)
print(source.report.aspects)
print(source.report.as_string())


if __name__ == "__main__":
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import uuid
from collections import defaultdict
from datetime import datetime, timezone
from typing import Dict, Iterable, List
from typing import Dict, Iterable, List, Optional

from databricks.sdk.service.catalog import ColumnTypeName
from databricks.sdk.service.sql import QueryStatementType
Expand Down Expand Up @@ -57,13 +57,15 @@ def assigned_metastore(self) -> Metastore:
region=None,
)

def catalogs(self, metastore: Metastore) -> Iterable[Catalog]:
def catalogs(self, metastore: Optional[Metastore]) -> Iterable[Catalog]:
for container in self.seed_metadata.containers[1]:
if not container.parent or metastore.name != container.parent.name:
if not container.parent or (
metastore and metastore.name != container.parent.name
):
continue

yield Catalog(
id=f"{metastore.id}.{container.name}",
id=f"{metastore.id}.{container.name}" if metastore else container.name,
name=container.name,
metastore=metastore,
comment=None,
Expand Down Expand Up @@ -153,7 +155,7 @@ def query_history(
executed_as_user_name=None,
)

def table_lineage(self, table: Table) -> None:
def table_lineage(self, table: Table, include_entity_lineage: bool) -> None:
pass

def get_column_lineage(self, table: Table) -> None:
Expand Down
Loading