From 7b1431ac6de5f5a62be9a373274fd03e1609a7ce Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 19 Sep 2024 08:22:50 -0700 Subject: [PATCH] feat(ingest): add `get_entity_as_mcps` method to client (#11425) --- .../src/datahub/ingestion/graph/client.py | 59 +++++++++++++++---- 1 file changed, 48 insertions(+), 11 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/graph/client.py b/metadata-ingestion/src/datahub/ingestion/graph/client.py index c783d9a35814b3..0fdb7bb537457d 100644 --- a/metadata-ingestion/src/datahub/ingestion/graph/client.py +++ b/metadata-ingestion/src/datahub/ingestion/graph/client.py @@ -527,6 +527,50 @@ def get_aspects_for_entity( return result + def get_entity_as_mcps( + self, entity_urn: str, aspects: Optional[List[str]] = None + ) -> List[MetadataChangeProposalWrapper]: + """Get all non-timeseries aspects for an entity. + + By formatting the entity's aspects as MCPWs, we can also include SystemMetadata. + + Warning: Do not use this method to determine if an entity exists! This method will always return + something, even if the entity doesn't actually exist in DataHub. + + Args: + entity_urn: The urn of the entity + aspects: Optional list of aspect names being requested (e.g. ["schemaMetadata", "datasetProperties"]) + + Returns: + A list of MCPWs. + """ + + response_json = self.get_entity_raw(entity_urn, aspects) + + # Now, we parse the response into proper aspect objects. + results: List[MetadataChangeProposalWrapper] = [] + for aspect_name, aspect_json in response_json.get("aspects", {}).items(): + aspect_type = ASPECT_NAME_MAP.get(aspect_name) + if aspect_type is None: + logger.warning(f"Ignoring unknown aspect type {aspect_name}") + continue + + post_json_obj = post_json_transform(aspect_json) + aspect_value = aspect_type.from_obj(post_json_obj["value"]) + + system_metadata_raw = post_json_obj["systemMetadata"] + system_metadata = SystemMetadataClass.from_obj(system_metadata_raw) + + mcpw = MetadataChangeProposalWrapper( + entityUrn=entity_urn, + aspect=aspect_value, + systemMetadata=system_metadata, + ) + + results.append(mcpw) + + return results + def get_entity_semityped( self, entity_urn: str, aspects: Optional[List[str]] = None ) -> AspectBag: @@ -545,19 +589,12 @@ def get_entity_semityped( not be present in the dictionary. The entity's key aspect will always be present. """ - response_json = self.get_entity_raw(entity_urn, aspects) + mcps = self.get_entity_as_mcps(entity_urn, aspects) - # Now, we parse the response into proper aspect objects. result: AspectBag = {} - for aspect_name, aspect_json in response_json.get("aspects", {}).items(): - aspect_type = ASPECT_NAME_MAP.get(aspect_name) - if aspect_type is None: - logger.warning(f"Ignoring unknown aspect type {aspect_name}") - continue - - post_json_obj = post_json_transform(aspect_json) - aspect_value = aspect_type.from_obj(post_json_obj["value"]) - result[aspect_name] = aspect_value # type: ignore + for mcp in mcps: + if mcp.aspect: + result[mcp.aspect.get_aspect_name()] = mcp.aspect # type: ignore return result