Skip to content

Commit

Permalink
feat(ingest/databricks): include metadata for browse only tables
Browse files Browse the repository at this point in the history
also contains ug fix for hive metastore views
  • Loading branch information
mayurinehate committed Jun 24, 2024
1 parent 710e605 commit e1d2634
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 8 deletions.
3 changes: 2 additions & 1 deletion metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@

databricks = {
# 0.1.11 appears to have authentication issues with azure databricks
"databricks-sdk>=0.9.0",
# 0.22.0 has support for `include_browse` in metadata list apis
"databricks-sdk>=0.22.0",
"pyspark~=3.3.0",
"requests",
# Version 2.4.0 includes sqlalchemy dialect, 2.8.0 includes some bug fixes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ def get_table_names(self, schema_name: str) -> List[str]:
def get_view_names(self, schema_name: str) -> List[str]:
try:
rows = self._execute_sql(f"SHOW VIEWS FROM `{schema_name}`")
# 3 columns - database, tableName, isTemporary
return [row.tableName for row in rows]
# 4 columns - namespace, viewName, isTemporary, isMaterialized
return [row.viewName for row in rows]
except Exception as e:
self.report.report_warning("Failed to get views for schema", schema_name)
logger.warning(
Expand Down
16 changes: 11 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def __init__(
self.hive_metastore_proxy = hive_metastore_proxy

def check_basic_connectivity(self) -> bool:
return bool(self._workspace_client.catalogs.list())
return bool(self._workspace_client.catalogs.list(include_browse=True))

def assigned_metastore(self) -> Optional[Metastore]:
response = self._workspace_client.metastores.summary()
Expand All @@ -119,7 +119,7 @@ def catalogs(self, metastore: Optional[Metastore]) -> Iterable[Catalog]:
if self.hive_metastore_proxy:
yield self.hive_metastore_proxy.hive_metastore_catalog(metastore)

response = self._workspace_client.catalogs.list()
response = self._workspace_client.catalogs.list(include_browse=True)
if not response:
logger.info("Catalogs not found")
return []
Expand All @@ -131,7 +131,9 @@ def catalogs(self, metastore: Optional[Metastore]) -> Iterable[Catalog]:
def catalog(
self, catalog_name: str, metastore: Optional[Metastore]
) -> Optional[Catalog]:
response = self._workspace_client.catalogs.get(catalog_name)
response = self._workspace_client.catalogs.get(
catalog_name, include_browse=True
)
if not response:
logger.info(f"Catalog {catalog_name} not found")
return None
Expand All @@ -148,7 +150,9 @@ def schemas(self, catalog: Catalog) -> Iterable[Schema]:
):
yield from self.hive_metastore_proxy.hive_metastore_schemas(catalog)
return
response = self._workspace_client.schemas.list(catalog_name=catalog.name)
response = self._workspace_client.schemas.list(
catalog_name=catalog.name, include_browse=True
)
if not response:
logger.info(f"Schemas not found for catalog {catalog.id}")
return []
Expand All @@ -166,7 +170,9 @@ def tables(self, schema: Schema) -> Iterable[Table]:
return
with patch("databricks.sdk.service.catalog.TableInfo", TableInfoWithGeneration):
response = self._workspace_client.tables.list(
catalog_name=schema.catalog.name, schema_name=schema.name
catalog_name=schema.catalog.name,
schema_name=schema.name,
include_browse=True,
)
if not response:
logger.info(f"Tables not found for schema {schema.id}")
Expand Down

0 comments on commit e1d2634

Please sign in to comment.