From e1d2634889da27af0c05d5454005ba8a3262dfe8 Mon Sep 17 00:00:00 2001 From: Mayuri N Date: Fri, 21 Jun 2024 16:26:37 +0530 Subject: [PATCH] feat(ingest/databricks): include metadata for browse only tables also contains ug fix for hive metastore views --- metadata-ingestion/setup.py | 3 ++- .../source/unity/hive_metastore_proxy.py | 4 ++-- .../src/datahub/ingestion/source/unity/proxy.py | 16 +++++++++++----- 3 files changed, 15 insertions(+), 8 deletions(-) diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 04099e0a24b9ff..90a7faf6cc0507 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -276,7 +276,8 @@ databricks = { # 0.1.11 appears to have authentication issues with azure databricks - "databricks-sdk>=0.9.0", + # 0.22.0 has support for `include_browse` in metadata list apis + "databricks-sdk>=0.22.0", "pyspark~=3.3.0", "requests", # Version 2.4.0 includes sqlalchemy dialect, 2.8.0 includes some bug fixes diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py b/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py index c99fe3b09c5bb5..eea10d940bd1c8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/hive_metastore_proxy.py @@ -135,8 +135,8 @@ def get_table_names(self, schema_name: str) -> List[str]: def get_view_names(self, schema_name: str) -> List[str]: try: rows = self._execute_sql(f"SHOW VIEWS FROM `{schema_name}`") - # 3 columns - database, tableName, isTemporary - return [row.tableName for row in rows] + # 4 columns - namespace, viewName, isTemporary, isMaterialized + return [row.viewName for row in rows] except Exception as e: self.report.report_warning("Failed to get views for schema", schema_name) logger.warning( diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py index 1e90f3a044f425..5dea46d54457ab 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py @@ -109,7 +109,7 @@ def __init__( self.hive_metastore_proxy = hive_metastore_proxy def check_basic_connectivity(self) -> bool: - return bool(self._workspace_client.catalogs.list()) + return bool(self._workspace_client.catalogs.list(include_browse=True)) def assigned_metastore(self) -> Optional[Metastore]: response = self._workspace_client.metastores.summary() @@ -119,7 +119,7 @@ def catalogs(self, metastore: Optional[Metastore]) -> Iterable[Catalog]: if self.hive_metastore_proxy: yield self.hive_metastore_proxy.hive_metastore_catalog(metastore) - response = self._workspace_client.catalogs.list() + response = self._workspace_client.catalogs.list(include_browse=True) if not response: logger.info("Catalogs not found") return [] @@ -131,7 +131,9 @@ def catalogs(self, metastore: Optional[Metastore]) -> Iterable[Catalog]: def catalog( self, catalog_name: str, metastore: Optional[Metastore] ) -> Optional[Catalog]: - response = self._workspace_client.catalogs.get(catalog_name) + response = self._workspace_client.catalogs.get( + catalog_name, include_browse=True + ) if not response: logger.info(f"Catalog {catalog_name} not found") return None @@ -148,7 +150,9 @@ def schemas(self, catalog: Catalog) -> Iterable[Schema]: ): yield from self.hive_metastore_proxy.hive_metastore_schemas(catalog) return - response = self._workspace_client.schemas.list(catalog_name=catalog.name) + response = self._workspace_client.schemas.list( + catalog_name=catalog.name, include_browse=True + ) if not response: logger.info(f"Schemas not found for catalog {catalog.id}") return [] @@ -166,7 +170,9 @@ def tables(self, schema: Schema) -> Iterable[Table]: return with patch("databricks.sdk.service.catalog.TableInfo", TableInfoWithGeneration): response = self._workspace_client.tables.list( - catalog_name=schema.catalog.name, schema_name=schema.name + catalog_name=schema.catalog.name, + schema_name=schema.name, + include_browse=True, ) if not response: logger.info(f"Tables not found for schema {schema.id}")