diff --git a/metadata-ingestion/src/datahub/emitter/mcp.py b/metadata-ingestion/src/datahub/emitter/mcp.py index a1cdf93198b044..1356f58f05103c 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp.py +++ b/metadata-ingestion/src/datahub/emitter/mcp.py @@ -1,6 +1,6 @@ import dataclasses import json -from typing import TYPE_CHECKING, Optional, Tuple, Union +from typing import TYPE_CHECKING, List, Optional, Tuple, Union from datahub.emitter.aspect import ASPECT_MAP, TIMESERIES_ASPECT_MAP from datahub.emitter.serialization_helper import post_json_transform, pre_json_transform @@ -99,6 +99,12 @@ def __post_init__(self) -> None: f"aspectName {self.aspectName} does not match aspect type {type(self.aspect)} with name {self.aspect.get_aspect_name()}" ) + @classmethod + def construct_many( + cls, entityUrn: str, aspects: List[Optional[_Aspect]] + ) -> List["MetadataChangeProposalWrapper"]: + return [cls(entityUrn=entityUrn, aspect=aspect) for aspect in aspects if aspect] + def make_mcp(self) -> MetadataChangeProposalClass: serializedEntityKeyAspect: Union[None, GenericAspectClass] = None if isinstance(self.entityKeyAspect, DictWrapper): diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index 0a0af54516ff28..3c9f3011204d47 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -15,7 +15,6 @@ PlatformKey, UnitySchemaKey, add_dataset_to_container, - add_domain_to_entity_wu, gen_containers, ) from datahub.ingestion.api.common import PipelineContext @@ -45,16 +44,15 @@ from datahub.ingestion.source.unity.config import UnityCatalogSourceConfig from datahub.ingestion.source.unity.proxy import Catalog, Metastore, Schema from datahub.ingestion.source.unity.report import UnityCatalogReport -from datahub.metadata.com.linkedin.pegasus2avro.common import Status from datahub.metadata.com.linkedin.pegasus2avro.dataset import ( FineGrainedLineage, FineGrainedLineageUpstreamType, ViewProperties, ) from datahub.metadata.schema_classes import ( - ChangeTypeClass, DatasetLineageTypeClass, DatasetPropertiesClass, + DomainsClass, MySqlDDLClass, SchemaFieldClass, SchemaMetadataClass, @@ -67,6 +65,7 @@ from datahub.utilities.source_helpers import ( auto_stale_entity_removal, auto_status_aspect, + auto_workunit_reporter, ) logger: logging.Logger = logging.getLogger(__name__) @@ -95,24 +94,11 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource): - tables and column lineage """ - def get_platform_instance_id(self) -> str: - return self.config.platform_instance or self.platform - config: UnityCatalogSourceConfig unity_catalog_api_proxy: proxy.UnityCatalogApiProxy platform: str = "databricks" platform_instance_name: str - def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: - # emit metadata work unit to DataHub GMS - yield from self.process_metastores() - - def get_workunits(self) -> Iterable[MetadataWorkUnit]: - return auto_stale_entity_removal( - self.stale_entity_removal_handler, - auto_status_aspect(self.get_workunits_internal()), - ) - def get_report(self) -> SourceReport: return self.report @@ -171,10 +157,25 @@ def create(cls, config_dict, ctx): config = UnityCatalogSourceConfig.parse_obj(config_dict) return cls(ctx=ctx, config=config) + def get_platform_instance_id(self) -> str: + return self.config.platform_instance or self.platform + + def get_workunits(self) -> Iterable[MetadataWorkUnit]: + return auto_stale_entity_removal( + self.stale_entity_removal_handler, + auto_workunit_reporter( + self.report, + auto_status_aspect(self.get_workunits_internal()), + ), + ) + + def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: + yield from self.process_metastores() + def process_metastores(self) -> Iterable[MetadataWorkUnit]: for metastore in self.unity_catalog_api_proxy.metastores(): if not self.config.metastore_id_pattern.allowed(metastore.metastore_id): - self.report.filtered.append(f"{metastore.metastore_id}.*.*.*") + self.report.report_dropped(f"{metastore.metastore_id}.*.*.*") continue logger.info( f"Started to process metastore: {metastore.metastore_id} ({metastore.name})" @@ -191,7 +192,7 @@ def process_catalogs( ) -> Iterable[MetadataWorkUnit]: for catalog in self.unity_catalog_api_proxy.catalogs(metastore=metastore): if not self.config.catalog_pattern.allowed(catalog.name): - self.report.filtered.append(f"{catalog.name}.*.*") + self.report.report_dropped(f"{catalog.name}.*.*") continue yield from self.gen_catalog_containers(catalog) self.report.increment_scanned_catalog(1) @@ -200,7 +201,7 @@ def process_catalogs( def process_schemas(self, catalog: proxy.Catalog) -> Iterable[MetadataWorkUnit]: for schema in self.unity_catalog_api_proxy.schemas(catalog=catalog): if not self.config.schema_pattern.allowed(schema.name): - self.report.filtered.append(f"{catalog.name}.{schema.name}.*") + self.report.report_dropped(f"{catalog.name}.{schema.name}.*") continue yield from self.gen_schema_containers(schema) @@ -213,7 +214,7 @@ def process_tables(self, schema: proxy.Schema) -> Iterable[MetadataWorkUnit]: if not self.config.table_pattern.allowed( f"{table.schema.catalog.name}.{table.schema.name}.{table.name}" ): - self.report.filtered.append( + self.report.report_dropped( f"{schema.catalog.name}.{schema.name}.{table.name}" ) continue @@ -224,36 +225,43 @@ def process_tables(self, schema: proxy.Schema) -> Iterable[MetadataWorkUnit]: name=table.id, ) yield from self.add_table_to_dataset_container(dataset_urn, schema) - yield self._create_table_property_aspect_mcp(table) + + table_props = self._create_table_property_aspect(table) + + view_props = None if table.view_definition: - yield self._create_view_property_aspect(table) - yield self._create_table_sub_type_aspect_mcp(table) - yield self._create_schema_metadata_aspect_mcp(table) - status = Status(removed=False) - mcp = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, - entityUrn=dataset_urn, - aspect=status, - ) + view_props = self._create_view_property_aspect(table) - wu = MetadataWorkUnit(id=f"status-{dataset_urn}", mcp=mcp) - self.report.report_workunit(wu) - yield wu + sub_type = self._create_table_sub_type_aspect(table) + schema_metadata = self._create_schema_metadata_aspect(table) - yield from self._get_domain_wu( + domain = self._get_domain_aspect( dataset_name=str( f"{table.schema.catalog.name}.{table.schema.name}.{table.name}" - ), - entity_urn=dataset_urn, + ) ) if self.config.include_column_lineage: self.unity_catalog_api_proxy.get_column_lineage(table) - yield from self._generate_column_lineage_mcp(dataset_urn, table) + lineage = self._generate_column_lineage_aspect(dataset_urn, table) else: self.unity_catalog_api_proxy.table_lineage(table) - yield from self._generate_lineage_mcp(dataset_urn, table) + lineage = self._generate_lineage_aspect(dataset_urn, table) + + yield from [ + mcp.as_workunit() + for mcp in MetadataChangeProposalWrapper.construct_many( + entityUrn=dataset_urn, + aspects=[ + table_props, + view_props, + sub_type, + schema_metadata, + domain, + lineage, + ], + ) + ] self.report.report_entity_scanned( f"{table.schema.catalog.name}.{table.schema.name}.{table.name}", @@ -262,9 +270,9 @@ def process_tables(self, schema: proxy.Schema) -> Iterable[MetadataWorkUnit]: self.report.increment_scanned_table(1) - def _generate_column_lineage_mcp( + def _generate_column_lineage_aspect( self, dataset_urn: str, table: proxy.Table - ) -> Iterable[MetadataWorkUnit]: + ) -> Optional[UpstreamLineageClass]: upstreams: List[UpstreamClass] = [] finegrained_lineages: List[FineGrainedLineage] = [] for upstream in sorted(table.upstreams.keys()): @@ -293,22 +301,15 @@ def _generate_column_lineage_mcp( upstreams.append(upstream_table) if upstreams: - upstream_lineage = UpstreamLineageClass( + return UpstreamLineageClass( upstreams=upstreams, fineGrainedLineages=finegrained_lineages ) - mcp = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, - entityUrn=dataset_urn, - aspect=upstream_lineage, - ) - wu = MetadataWorkUnit(id=f"upstream-{dataset_urn}", mcp=mcp) - self.report.report_workunit(wu) - yield wu + else: + return None - def _generate_lineage_mcp( + def _generate_lineage_aspect( self, dataset_urn: str, table: proxy.Table - ) -> Iterable[MetadataWorkUnit]: + ) -> Optional[UpstreamLineageClass]: upstreams: List[UpstreamClass] = [] for upstream in sorted(table.upstreams.keys()): upstream_urn = make_dataset_urn_with_platform_instance( @@ -324,37 +325,21 @@ def _generate_lineage_mcp( upstreams.append(upstream_table) if upstreams: - upstream_lineage = UpstreamLineageClass(upstreams=upstreams) - mcp = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, - entityUrn=dataset_urn, - aspect=upstream_lineage, - ) - wu = MetadataWorkUnit(id=f"upstream-{dataset_urn}", mcp=mcp) - self.report.report_workunit(wu) - yield wu - - def _get_domain_wu( - self, - dataset_name: str, - entity_urn: str, - ) -> Iterable[MetadataWorkUnit]: + return UpstreamLineageClass(upstreams=upstreams) + else: + return None + + def _get_domain_aspect(self, dataset_name: str) -> Optional[DomainsClass]: domain_urn = self._gen_domain_urn(dataset_name) - if domain_urn: - wus = add_domain_to_entity_wu( - entity_urn=entity_urn, - domain_urn=domain_urn, - ) - for wu in wus: - self.report.report_workunit(wu) - yield wu + if not domain_urn: + return None + return DomainsClass(domains=[domain_urn]) def gen_schema_containers(self, schema: Schema) -> Iterable[MetadataWorkUnit]: domain_urn = self._gen_domain_urn(f"{schema.catalog.name}.{schema.name}") schema_container_key = self.gen_schema_key(schema) - container_workunits = gen_containers( + yield from gen_containers( container_key=schema_container_key, name=schema.name, sub_types=["Schema"], @@ -363,10 +348,6 @@ def gen_schema_containers(self, schema: Schema) -> Iterable[MetadataWorkUnit]: description=schema.comment, ) - for wu in container_workunits: - self.report.report_workunit(wu) - yield wu - def gen_metastore_containers( self, metastore: Metastore ) -> Iterable[MetadataWorkUnit]: @@ -374,7 +355,7 @@ def gen_metastore_containers( metastore_container_key = self.gen_metastore_key(metastore) - container_workunits = gen_containers( + yield from gen_containers( container_key=metastore_container_key, name=metastore.name, sub_types=["Metastore"], @@ -382,10 +363,6 @@ def gen_metastore_containers( description=metastore.comment, ) - for wu in container_workunits: - self.report.report_workunit(wu) - yield wu - def gen_catalog_containers(self, catalog: Catalog) -> Iterable[MetadataWorkUnit]: domain_urn = self._gen_domain_urn(catalog.name) @@ -393,7 +370,7 @@ def gen_catalog_containers(self, catalog: Catalog) -> Iterable[MetadataWorkUnit] catalog_container_key = self.gen_catalog_key(catalog) - container_workunits = gen_containers( + yield from gen_containers( container_key=catalog_container_key, name=catalog.name, sub_types=["Catalog"], @@ -402,10 +379,6 @@ def gen_catalog_containers(self, catalog: Catalog) -> Iterable[MetadataWorkUnit] description=catalog.comment, ) - for wu in container_workunits: - self.report.report_workunit(wu) - yield wu - def gen_schema_key(self, schema: Schema) -> PlatformKey: return UnitySchemaKey( unity_schema=schema.name, @@ -415,14 +388,14 @@ def gen_schema_key(self, schema: Schema) -> PlatformKey: metastore=schema.catalog.metastore.name, ) - def gen_metastore_key(self, metastore: Metastore) -> PlatformKey: + def gen_metastore_key(self, metastore: Metastore) -> MetastoreKey: return MetastoreKey( metastore=metastore.name, platform=self.platform, instance=self.config.platform_instance, ) - def gen_catalog_key(self, catalog: Catalog) -> PlatformKey: + def gen_catalog_key(self, catalog: Catalog) -> CatalogKey: return CatalogKey( catalog=catalog.name, metastore=catalog.metastore.name, @@ -445,20 +418,14 @@ def add_table_to_dataset_container( self, dataset_urn: str, schema: Schema ) -> Iterable[MetadataWorkUnit]: schema_container_key = self.gen_schema_key(schema) - container_workunits = add_dataset_to_container( + yield from add_dataset_to_container( container_key=schema_container_key, dataset_urn=dataset_urn, ) - for wu in container_workunits: - self.report.report_workunit(wu) - yield wu - def _create_table_property_aspect_mcp(self, table: proxy.Table) -> MetadataWorkUnit: - dataset_urn: str = make_dataset_urn_with_platform_instance( - platform=self.platform, - platform_instance=self.platform_instance_name, - name=table.id, - ) + def _create_table_property_aspect( + self, table: proxy.Table + ) -> DatasetPropertiesClass: custom_properties: dict = {} if table.storage_location is not None: custom_properties["storage_location"] = table.storage_location @@ -477,93 +444,37 @@ def _create_table_property_aspect_mcp(self, table: proxy.Table) -> MetadataWorkU custom_properties["updated_by"] = table.updated_by custom_properties["updated_at"] = str(table.updated_at) - mcp = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, - entityUrn=dataset_urn, - aspect=DatasetPropertiesClass( - name=table.name, - description=table.comment, - customProperties=custom_properties, - ), + return DatasetPropertiesClass( + name=table.name, + description=table.comment, + customProperties=custom_properties, ) - wu = MetadataWorkUnit(id=f"datasetProperties-{dataset_urn}", mcp=mcp) - self.report.report_workunit(wu) - - return wu - - def _create_table_sub_type_aspect_mcp(self, table: proxy.Table) -> MetadataWorkUnit: - dataset_urn: str = make_dataset_urn_with_platform_instance( - platform=self.platform, - platform_instance=self.platform_instance_name, - name=table.id, - ) - mcp = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, - entityUrn=dataset_urn, - aspect=SubTypesClass( - typeNames=["View" if table.table_type.lower() == "view" else "Table"] - ), + def _create_table_sub_type_aspect(self, table: proxy.Table) -> SubTypesClass: + return SubTypesClass( + typeNames=["View" if table.table_type.lower() == "view" else "Table"] ) - wu = MetadataWorkUnit(id=f"subType-{dataset_urn}", mcp=mcp) - self.report.report_workunit(wu) - - return wu - - def _create_view_property_aspect(self, table: proxy.Table) -> MetadataWorkUnit: - dataset_urn: str = make_dataset_urn_with_platform_instance( - platform=self.platform, - platform_instance=self.platform_instance_name, - name=table.id, - ) + def _create_view_property_aspect(self, table: proxy.Table) -> ViewProperties: assert table.view_definition - view_properties_aspect = ViewProperties( + return ViewProperties( materialized=False, viewLanguage="SQL", viewLogic=table.view_definition ) - mcp = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, - entityUrn=dataset_urn, - aspect=view_properties_aspect, - ) - wu = MetadataWorkUnit(id=f"view_properties-{dataset_urn}", mcp=mcp) - self.report.report_workunit(wu) - - return wu - def _create_schema_metadata_aspect_mcp( - self, table: proxy.Table - ) -> MetadataWorkUnit: + def _create_schema_metadata_aspect(self, table: proxy.Table) -> SchemaMetadataClass: schema_fields: List[SchemaFieldClass] = [] for column in table.columns: schema_fields.extend(self._create_schema_field(column)) - dataset_urn: str = make_dataset_urn_with_platform_instance( - platform=self.platform, - platform_instance=self.platform_instance_name, - name=table.id, + return SchemaMetadataClass( + schemaName=table.id, + platform=make_data_platform_urn(self.platform), + fields=schema_fields, + hash="", + version=0, + platformSchema=MySqlDDLClass(tableSchema=""), ) - mcp = MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, - entityUrn=dataset_urn, - aspect=SchemaMetadataClass( - schemaName=table.id, - platform=make_data_platform_urn(self.platform), - fields=schema_fields, - hash="", - version=0, - platformSchema=MySqlDDLClass(tableSchema=""), - ), - ) - wu = MetadataWorkUnit(id=f"schema_metaclass-{dataset_urn}", mcp=mcp) - self.report.report_workunit(wu) - - return wu @staticmethod def _create_schema_field(column: proxy.Column) -> List[SchemaFieldClass]: diff --git a/metadata-ingestion/src/datahub/utilities/source_helpers.py b/metadata-ingestion/src/datahub/utilities/source_helpers.py index 2b771aa2a9f756..4f7d9ae309d4ac 100644 --- a/metadata-ingestion/src/datahub/utilities/source_helpers.py +++ b/metadata-ingestion/src/datahub/utilities/source_helpers.py @@ -1,6 +1,7 @@ from typing import Callable, Iterable, Optional, Set, Union from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.ingestion.api.source import SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.state.stale_entity_removal_handler import ( StaleEntityRemovalHandler, @@ -87,3 +88,16 @@ def auto_stale_entity_removal( # Clean up stale entities. yield from stale_entity_removal_handler.gen_removed_entity_workunits() + + +def auto_workunit_reporter( + report: SourceReport, + stream: Iterable[MetadataWorkUnit], +) -> Iterable[MetadataWorkUnit]: + """ + Calls report.report_workunit() on each workunit. + """ + + for wu in stream: + report.report_workunit(wu) + yield wu