Skip to content

Commit

Permalink
feat(ingest): add get_entity_as_mcps method to client (datahub-proj…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and sleeperdeep committed Dec 17, 2024
1 parent 26ba13f commit 7b1431a
Showing 1 changed file with 48 additions and 11 deletions.
59 changes: 48 additions & 11 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,50 @@ def get_aspects_for_entity(

return result

def get_entity_as_mcps(
self, entity_urn: str, aspects: Optional[List[str]] = None
) -> List[MetadataChangeProposalWrapper]:
"""Get all non-timeseries aspects for an entity.
By formatting the entity's aspects as MCPWs, we can also include SystemMetadata.
Warning: Do not use this method to determine if an entity exists! This method will always return
something, even if the entity doesn't actually exist in DataHub.
Args:
entity_urn: The urn of the entity
aspects: Optional list of aspect names being requested (e.g. ["schemaMetadata", "datasetProperties"])
Returns:
A list of MCPWs.
"""

response_json = self.get_entity_raw(entity_urn, aspects)

# Now, we parse the response into proper aspect objects.
results: List[MetadataChangeProposalWrapper] = []
for aspect_name, aspect_json in response_json.get("aspects", {}).items():
aspect_type = ASPECT_NAME_MAP.get(aspect_name)
if aspect_type is None:
logger.warning(f"Ignoring unknown aspect type {aspect_name}")
continue

post_json_obj = post_json_transform(aspect_json)
aspect_value = aspect_type.from_obj(post_json_obj["value"])

system_metadata_raw = post_json_obj["systemMetadata"]
system_metadata = SystemMetadataClass.from_obj(system_metadata_raw)

mcpw = MetadataChangeProposalWrapper(
entityUrn=entity_urn,
aspect=aspect_value,
systemMetadata=system_metadata,
)

results.append(mcpw)

return results

def get_entity_semityped(
self, entity_urn: str, aspects: Optional[List[str]] = None
) -> AspectBag:
Expand All @@ -545,19 +589,12 @@ def get_entity_semityped(
not be present in the dictionary. The entity's key aspect will always be present.
"""

response_json = self.get_entity_raw(entity_urn, aspects)
mcps = self.get_entity_as_mcps(entity_urn, aspects)

# Now, we parse the response into proper aspect objects.
result: AspectBag = {}
for aspect_name, aspect_json in response_json.get("aspects", {}).items():
aspect_type = ASPECT_NAME_MAP.get(aspect_name)
if aspect_type is None:
logger.warning(f"Ignoring unknown aspect type {aspect_name}")
continue

post_json_obj = post_json_transform(aspect_json)
aspect_value = aspect_type.from_obj(post_json_obj["value"])
result[aspect_name] = aspect_value # type: ignore
for mcp in mcps:
if mcp.aspect:
result[mcp.aspect.get_aspect_name()] = mcp.aspect # type: ignore

return result

Expand Down

0 comments on commit 7b1431a

Please sign in to comment.