Skip to content

Commit

Permalink
feat(ingest): add entity type inference to mcpw
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Sep 9, 2022
1 parent e7b33c5 commit 1595ec5
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 22 deletions.
41 changes: 35 additions & 6 deletions metadata-ingestion/src/datahub/emitter/mcp.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import dataclasses
import json
from typing import Union
from typing import TYPE_CHECKING, Union

from datahub.emitter.serialization_helper import pre_json_transform
from datahub.metadata.schema_classes import (
Expand All @@ -12,6 +12,12 @@
SystemMetadataClass,
_Aspect,
)
from datahub.utilities.urns.urn import guess_entity_type

if TYPE_CHECKING:
from datahub.ingestion.api.workunit import MetadataWorkUnit

_ENTITY_TYPE_UNSET = "ENTITY_TYPE_UNSET"


def _make_generic_aspect(codegen_obj: DictWrapper) -> GenericAspectClass:
Expand All @@ -24,13 +30,12 @@ def _make_generic_aspect(codegen_obj: DictWrapper) -> GenericAspectClass:

@dataclasses.dataclass
class MetadataChangeProposalWrapper:
# TODO: remove manually aspectName from the codebase
# TODO: remove manually set aspectName from the codebase
# TODO: (after) remove aspectName field from this class
# TODO: infer entityType from entityUrn
# TODO: set changeType's default to UPSERT
# TODO: remove manually set entityType from the codebase

entityType: str
changeType: Union[str, ChangeTypeClass]
entityType: str = _ENTITY_TYPE_UNSET
changeType: Union[str, ChangeTypeClass] = ChangeTypeClass.UPSERT
entityUrn: Union[None, str] = None
entityKeyAspect: Union[None, _Aspect] = None
auditHeader: Union[None, KafkaAuditHeaderClass] = None
Expand All @@ -39,6 +44,22 @@ class MetadataChangeProposalWrapper:
systemMetadata: Union[None, SystemMetadataClass] = None

def __post_init__(self) -> None:
if self.entityUrn and self.entityType == _ENTITY_TYPE_UNSET:
self.entityType = guess_entity_type(self.entityUrn)
elif self.entityUrn and self.entityType:
guessed_entity_type = guess_entity_type(self.entityUrn).lower()
# Entity type checking is actually case insensitive.
# Note that urns are case sensitive, but entity types are not.
if self.entityType.lower() != guessed_entity_type:
raise ValueError(
f"entityType {self.entityType} does not match the entity type {guessed_entity_type} from entityUrn {self.entityUrn}",
)
elif self.entityType == _ENTITY_TYPE_UNSET:
raise ValueError("entityType must be set if entityUrn is not set")

if not self.entityUrn and not self.entityKeyAspect:
raise ValueError("entityUrn or entityKeyAspect must be set")

if not self.aspectName and self.aspect:
self.aspectName = self.aspect.get_aspect_name()
elif (
Expand Down Expand Up @@ -88,3 +109,11 @@ def to_obj(self, tuples: bool = False) -> dict:

# TODO: add a from_obj method. Implementing this would require us to
# inspect the aspectName field to determine which class to deserialize into.

def as_workunit(self) -> "MetadataWorkUnit":
from datahub.ingestion.api.workunit import MetadataWorkUnit

# TODO: If the aspect is a timeseries aspect, we should do some
# customization of the ID here.

return MetadataWorkUnit(id=f"{self.entityUrn}-{self.aspectName}", mcp=self)
14 changes: 4 additions & 10 deletions metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -1291,16 +1291,10 @@ def _process_view(
view_properties_aspect = ViewPropertiesClass(
materialized=False, viewLanguage="SQL", viewLogic=view_definition_string
)
view_properties_wu = MetadataWorkUnit(
id=f"{view}-viewProperties",
mcp=MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
entityUrn=dataset_urn,
aspectName="viewProperties",
aspect=view_properties_aspect,
),
)
view_properties_wu = MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=view_properties_aspect,
).as_workunit()
self.report.report_workunit(view_properties_wu)
yield view_properties_wu

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,15 +188,15 @@ def _run(
aspect=assertionResult,
)

# Emit Result! (timseries aspect)
# Emit Result! (timeseries aspect)
emitter.emit_mcp(dataset_assertionResult_mcp)
logger.info("Metadata sent to datahub.")
result = "DataHub notification succeeded"
except Exception as e:
result = "DataHub notification failed"
if self.graceful_exceptions:
logger.error(e)
logger.info("Supressing error because graceful_exceptions is set")
logger.info("Suppressing error because graceful_exceptions is set")
else:
raise

Expand Down
11 changes: 11 additions & 0 deletions metadata-ingestion/tests/unit/test_mcp_builder.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import datahub.emitter.mcp_builder as builder
import datahub.metadata.schema_classes as models
from datahub.emitter.mce_builder import datahub_guid
from datahub.emitter.mcp import MetadataChangeProposalWrapper


def test_guid_generator():
Expand Down Expand Up @@ -42,3 +44,12 @@ def test_guid_generators():

guid = key.guid()
assert guid == guid_datahub


def test_mcpw_inference():
mcpw = MetadataChangeProposalWrapper(
entityUrn="urn:li:dataset:(urn:li:dataPlatform:bigquery,harshal-playground-306419.test_schema.excess_deaths_derived,PROD)",
aspect=models.DomainsClass(domains=["urn:li:domain:health"]),
)
assert mcpw.entityType == "dataset"
assert mcpw.aspectName == "domains"
18 changes: 14 additions & 4 deletions metadata-ingestion/tests/unit/test_usage_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.time_window_config import BucketDuration, get_time_bucket
from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.usage.usage_common import (
Expand All @@ -18,6 +19,15 @@
_TestAggregatedDataset = GenericAggregatedDataset[_TestTableRef]


def _simple_urn_builder(resource):
return make_dataset_urn_with_platform_instance(
"snowflake",
resource.lower(),
"snowflake-dev",
"DEV",
)


def test_add_one_query_without_columns():
test_email = "[email protected]"
test_query = "select * from test"
Expand Down Expand Up @@ -149,7 +159,7 @@ def test_make_usage_workunit():
)
wu: MetadataWorkUnit = ta.make_usage_workunit(
bucket_duration=BucketDuration.DAY,
urn_builder=lambda x: x,
urn_builder=_simple_urn_builder,
top_n_queries=10,
format_sql_queries=False,
include_top_n_queries=True,
Expand Down Expand Up @@ -181,7 +191,7 @@ def test_query_formatting():
)
wu: MetadataWorkUnit = ta.make_usage_workunit(
bucket_duration=BucketDuration.DAY,
urn_builder=lambda x: x,
urn_builder=_simple_urn_builder,
top_n_queries=10,
format_sql_queries=True,
include_top_n_queries=True,
Expand Down Expand Up @@ -212,7 +222,7 @@ def test_query_trimming():
)
wu: MetadataWorkUnit = ta.make_usage_workunit(
bucket_duration=BucketDuration.DAY,
urn_builder=lambda x: x,
urn_builder=_simple_urn_builder,
top_n_queries=top_n_queries,
format_sql_queries=False,
include_top_n_queries=True,
Expand Down Expand Up @@ -248,7 +258,7 @@ def test_make_usage_workunit_include_top_n_queries():
)
wu: MetadataWorkUnit = ta.make_usage_workunit(
bucket_duration=BucketDuration.DAY,
urn_builder=lambda x: x,
urn_builder=_simple_urn_builder,
top_n_queries=10,
format_sql_queries=False,
include_top_n_queries=False,
Expand Down

0 comments on commit 1595ec5

Please sign in to comment.