Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): add get_entity_as_mcps method to client #11425

Merged
merged 1 commit into from
Sep 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 48 additions & 11 deletions metadata-ingestion/src/datahub/ingestion/graph/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,50 @@ def get_aspects_for_entity(

return result

def get_entity_as_mcps(
self, entity_urn: str, aspects: Optional[List[str]] = None
) -> List[MetadataChangeProposalWrapper]:
"""Get all non-timeseries aspects for an entity.
By formatting the entity's aspects as MCPWs, we can also include SystemMetadata.
Warning: Do not use this method to determine if an entity exists! This method will always return
something, even if the entity doesn't actually exist in DataHub.
Args:
entity_urn: The urn of the entity
aspects: Optional list of aspect names being requested (e.g. ["schemaMetadata", "datasetProperties"])
Returns:
A list of MCPWs.
"""

response_json = self.get_entity_raw(entity_urn, aspects)

# Now, we parse the response into proper aspect objects.
results: List[MetadataChangeProposalWrapper] = []
for aspect_name, aspect_json in response_json.get("aspects", {}).items():
aspect_type = ASPECT_NAME_MAP.get(aspect_name)
if aspect_type is None:
logger.warning(f"Ignoring unknown aspect type {aspect_name}")
continue

post_json_obj = post_json_transform(aspect_json)
aspect_value = aspect_type.from_obj(post_json_obj["value"])

system_metadata_raw = post_json_obj["systemMetadata"]
system_metadata = SystemMetadataClass.from_obj(system_metadata_raw)

mcpw = MetadataChangeProposalWrapper(
entityUrn=entity_urn,
aspect=aspect_value,
systemMetadata=system_metadata,
)

results.append(mcpw)

return results

def get_entity_semityped(
self, entity_urn: str, aspects: Optional[List[str]] = None
) -> AspectBag:
Expand All @@ -545,19 +589,12 @@ def get_entity_semityped(
not be present in the dictionary. The entity's key aspect will always be present.
"""

response_json = self.get_entity_raw(entity_urn, aspects)
mcps = self.get_entity_as_mcps(entity_urn, aspects)

# Now, we parse the response into proper aspect objects.
result: AspectBag = {}
for aspect_name, aspect_json in response_json.get("aspects", {}).items():
aspect_type = ASPECT_NAME_MAP.get(aspect_name)
if aspect_type is None:
logger.warning(f"Ignoring unknown aspect type {aspect_name}")
continue

post_json_obj = post_json_transform(aspect_json)
aspect_value = aspect_type.from_obj(post_json_obj["value"])
result[aspect_name] = aspect_value # type: ignore
for mcp in mcps:
if mcp.aspect:
result[mcp.aspect.get_aspect_name()] = mcp.aspect # type: ignore

return result

Expand Down
Loading