From 1595ec58c0e7132f645d52e018d906ee7ce33e70 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 8 Sep 2022 17:25:08 -0700 Subject: [PATCH] feat(ingest): add entity type inference to mcpw --- metadata-ingestion/src/datahub/emitter/mcp.py | 41 ++++++++++++++++--- .../ingestion/source/sql/sql_common.py | 14 ++----- .../integrations/great_expectations/action.py | 4 +- .../tests/unit/test_mcp_builder.py | 11 +++++ .../tests/unit/test_usage_common.py | 18 ++++++-- 5 files changed, 66 insertions(+), 22 deletions(-) diff --git a/metadata-ingestion/src/datahub/emitter/mcp.py b/metadata-ingestion/src/datahub/emitter/mcp.py index f80609b114f64f..72549e957cb87c 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp.py +++ b/metadata-ingestion/src/datahub/emitter/mcp.py @@ -1,6 +1,6 @@ import dataclasses import json -from typing import Union +from typing import TYPE_CHECKING, Union from datahub.emitter.serialization_helper import pre_json_transform from datahub.metadata.schema_classes import ( @@ -12,6 +12,12 @@ SystemMetadataClass, _Aspect, ) +from datahub.utilities.urns.urn import guess_entity_type + +if TYPE_CHECKING: + from datahub.ingestion.api.workunit import MetadataWorkUnit + +_ENTITY_TYPE_UNSET = "ENTITY_TYPE_UNSET" def _make_generic_aspect(codegen_obj: DictWrapper) -> GenericAspectClass: @@ -24,13 +30,12 @@ def _make_generic_aspect(codegen_obj: DictWrapper) -> GenericAspectClass: @dataclasses.dataclass class MetadataChangeProposalWrapper: - # TODO: remove manually aspectName from the codebase + # TODO: remove manually set aspectName from the codebase # TODO: (after) remove aspectName field from this class - # TODO: infer entityType from entityUrn - # TODO: set changeType's default to UPSERT + # TODO: remove manually set entityType from the codebase - entityType: str - changeType: Union[str, ChangeTypeClass] + entityType: str = _ENTITY_TYPE_UNSET + changeType: Union[str, ChangeTypeClass] = ChangeTypeClass.UPSERT entityUrn: Union[None, str] = None entityKeyAspect: Union[None, _Aspect] = None auditHeader: Union[None, KafkaAuditHeaderClass] = None @@ -39,6 +44,22 @@ class MetadataChangeProposalWrapper: systemMetadata: Union[None, SystemMetadataClass] = None def __post_init__(self) -> None: + if self.entityUrn and self.entityType == _ENTITY_TYPE_UNSET: + self.entityType = guess_entity_type(self.entityUrn) + elif self.entityUrn and self.entityType: + guessed_entity_type = guess_entity_type(self.entityUrn).lower() + # Entity type checking is actually case insensitive. + # Note that urns are case sensitive, but entity types are not. + if self.entityType.lower() != guessed_entity_type: + raise ValueError( + f"entityType {self.entityType} does not match the entity type {guessed_entity_type} from entityUrn {self.entityUrn}", + ) + elif self.entityType == _ENTITY_TYPE_UNSET: + raise ValueError("entityType must be set if entityUrn is not set") + + if not self.entityUrn and not self.entityKeyAspect: + raise ValueError("entityUrn or entityKeyAspect must be set") + if not self.aspectName and self.aspect: self.aspectName = self.aspect.get_aspect_name() elif ( @@ -88,3 +109,11 @@ def to_obj(self, tuples: bool = False) -> dict: # TODO: add a from_obj method. Implementing this would require us to # inspect the aspectName field to determine which class to deserialize into. + + def as_workunit(self) -> "MetadataWorkUnit": + from datahub.ingestion.api.workunit import MetadataWorkUnit + + # TODO: If the aspect is a timeseries aspect, we should do some + # customization of the ID here. + + return MetadataWorkUnit(id=f"{self.entityUrn}-{self.aspectName}", mcp=self) diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index cfc5d815fc57d3..fee1def3b3ddfa 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -1291,16 +1291,10 @@ def _process_view( view_properties_aspect = ViewPropertiesClass( materialized=False, viewLanguage="SQL", viewLogic=view_definition_string ) - view_properties_wu = MetadataWorkUnit( - id=f"{view}-viewProperties", - mcp=MetadataChangeProposalWrapper( - entityType="dataset", - changeType=ChangeTypeClass.UPSERT, - entityUrn=dataset_urn, - aspectName="viewProperties", - aspect=view_properties_aspect, - ), - ) + view_properties_wu = MetadataChangeProposalWrapper( + entityUrn=dataset_urn, + aspect=view_properties_aspect, + ).as_workunit() self.report.report_workunit(view_properties_wu) yield view_properties_wu diff --git a/metadata-ingestion/src/datahub/integrations/great_expectations/action.py b/metadata-ingestion/src/datahub/integrations/great_expectations/action.py index 568927dae1521b..85dbf1b54b742e 100644 --- a/metadata-ingestion/src/datahub/integrations/great_expectations/action.py +++ b/metadata-ingestion/src/datahub/integrations/great_expectations/action.py @@ -188,7 +188,7 @@ def _run( aspect=assertionResult, ) - # Emit Result! (timseries aspect) + # Emit Result! (timeseries aspect) emitter.emit_mcp(dataset_assertionResult_mcp) logger.info("Metadata sent to datahub.") result = "DataHub notification succeeded" @@ -196,7 +196,7 @@ def _run( result = "DataHub notification failed" if self.graceful_exceptions: logger.error(e) - logger.info("Supressing error because graceful_exceptions is set") + logger.info("Suppressing error because graceful_exceptions is set") else: raise diff --git a/metadata-ingestion/tests/unit/test_mcp_builder.py b/metadata-ingestion/tests/unit/test_mcp_builder.py index 12c96b2f604b45..0c851fe37f9141 100644 --- a/metadata-ingestion/tests/unit/test_mcp_builder.py +++ b/metadata-ingestion/tests/unit/test_mcp_builder.py @@ -1,5 +1,7 @@ import datahub.emitter.mcp_builder as builder +import datahub.metadata.schema_classes as models from datahub.emitter.mce_builder import datahub_guid +from datahub.emitter.mcp import MetadataChangeProposalWrapper def test_guid_generator(): @@ -42,3 +44,12 @@ def test_guid_generators(): guid = key.guid() assert guid == guid_datahub + + +def test_mcpw_inference(): + mcpw = MetadataChangeProposalWrapper( + entityUrn="urn:li:dataset:(urn:li:dataPlatform:bigquery,harshal-playground-306419.test_schema.excess_deaths_derived,PROD)", + aspect=models.DomainsClass(domains=["urn:li:domain:health"]), + ) + assert mcpw.entityType == "dataset" + assert mcpw.aspectName == "domains" diff --git a/metadata-ingestion/tests/unit/test_usage_common.py b/metadata-ingestion/tests/unit/test_usage_common.py index 5ab996fd3016b6..91b9e1a551d6a0 100644 --- a/metadata-ingestion/tests/unit/test_usage_common.py +++ b/metadata-ingestion/tests/unit/test_usage_common.py @@ -5,6 +5,7 @@ from datahub.configuration.common import AllowDenyPattern from datahub.configuration.time_window_config import BucketDuration, get_time_bucket +from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance from datahub.emitter.mcp import MetadataChangeProposalWrapper from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.usage.usage_common import ( @@ -18,6 +19,15 @@ _TestAggregatedDataset = GenericAggregatedDataset[_TestTableRef] +def _simple_urn_builder(resource): + return make_dataset_urn_with_platform_instance( + "snowflake", + resource.lower(), + "snowflake-dev", + "DEV", + ) + + def test_add_one_query_without_columns(): test_email = "test_email@test.com" test_query = "select * from test" @@ -149,7 +159,7 @@ def test_make_usage_workunit(): ) wu: MetadataWorkUnit = ta.make_usage_workunit( bucket_duration=BucketDuration.DAY, - urn_builder=lambda x: x, + urn_builder=_simple_urn_builder, top_n_queries=10, format_sql_queries=False, include_top_n_queries=True, @@ -181,7 +191,7 @@ def test_query_formatting(): ) wu: MetadataWorkUnit = ta.make_usage_workunit( bucket_duration=BucketDuration.DAY, - urn_builder=lambda x: x, + urn_builder=_simple_urn_builder, top_n_queries=10, format_sql_queries=True, include_top_n_queries=True, @@ -212,7 +222,7 @@ def test_query_trimming(): ) wu: MetadataWorkUnit = ta.make_usage_workunit( bucket_duration=BucketDuration.DAY, - urn_builder=lambda x: x, + urn_builder=_simple_urn_builder, top_n_queries=top_n_queries, format_sql_queries=False, include_top_n_queries=True, @@ -248,7 +258,7 @@ def test_make_usage_workunit_include_top_n_queries(): ) wu: MetadataWorkUnit = ta.make_usage_workunit( bucket_duration=BucketDuration.DAY, - urn_builder=lambda x: x, + urn_builder=_simple_urn_builder, top_n_queries=10, format_sql_queries=False, include_top_n_queries=False,