Skip to content

Commit

Permalink
chore(ingest): remove deprecated calls to Urn.create_from_string (#11983
Browse files Browse the repository at this point in the history
)
  • Loading branch information
hsheth2 authored Dec 2, 2024
1 parent 411e1a3 commit ce6474d
Show file tree
Hide file tree
Showing 20 changed files with 76 additions and 160 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def fqn(self) -> str:
return (
self.qualified_name
or self.id
or Urn.create_from_string(self.urn).get_entity_id()[0]
or Urn.from_string(self.urn).get_entity_id()[0]
)

@validator("urn", pre=True, always=True)
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/cli/put_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def platform(
"""

if name.startswith(f"urn:li:{DataPlatformUrn.ENTITY_TYPE}"):
platform_urn = DataPlatformUrn.create_from_string(name)
platform_urn = DataPlatformUrn.from_string(name)
platform_name = platform_urn.get_entity_id_as_string()
else:
platform_name = name.lower()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def _get_owner_urn(maybe_urn: str) -> str:

def _abort_if_non_existent_urn(graph: DataHubGraph, urn: str, operation: str) -> None:
try:
parsed_urn: Urn = Urn.create_from_string(urn)
parsed_urn: Urn = Urn.from_string(urn)
entity_type = parsed_urn.get_type()
except Exception:
click.secho(f"Provided urn {urn} does not seem valid", fg="red")
Expand Down
43 changes: 43 additions & 0 deletions metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
import json
import time
from collections import defaultdict
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Sequence, Union

from datahub.emitter.aspect import JSON_PATCH_CONTENT_TYPE
from datahub.emitter.serialization_helper import pre_json_transform
from datahub.metadata.schema_classes import (
AuditStampClass,
ChangeTypeClass,
EdgeClass,
GenericAspectClass,
KafkaAuditHeaderClass,
MetadataChangeProposalClass,
SystemMetadataClass,
)
from datahub.metadata.urns import Urn
from datahub.utilities.urns.urn import guess_entity_type


Expand Down Expand Up @@ -89,3 +93,42 @@ def build(self) -> Iterable[MetadataChangeProposalClass]:
)
for aspect_name, patches in self.patches.items()
]

@classmethod
def _mint_auditstamp(cls, message: Optional[str] = None) -> AuditStampClass:
"""
Creates an AuditStampClass instance with the current timestamp and other default values.
Args:
message: The message associated with the audit stamp (optional).
Returns:
An instance of AuditStampClass.
"""
return AuditStampClass(
time=int(time.time() * 1000.0),
actor="urn:li:corpuser:datahub",
message=message,
)

@classmethod
def _ensure_urn_type(
cls, entity_type: str, edges: List[EdgeClass], context: str
) -> None:
"""
Ensures that the destination URNs in the given edges have the specified entity type.
Args:
entity_type: The entity type to check against.
edges: A list of Edge objects.
context: The context or description of the operation.
Raises:
ValueError: If any of the destination URNs is not of the specified entity type.
"""
for e in edges:
urn = Urn.from_string(e.destinationUrn)
if not urn.entity_type == entity_type:
raise ValueError(
f"{context}: {e.destinationUrn} is not of type {entity_type}"
)
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ def from_string_name(cls, ref: str) -> "BigQueryTableRef":
@classmethod
def from_urn(cls, urn: str) -> "BigQueryTableRef":
"""Raises: ValueError if urn is not a valid BigQuery table URN."""
dataset_urn = DatasetUrn.create_from_string(urn)
dataset_urn = DatasetUrn.from_string(urn)
split = dataset_urn.name.rsplit(".", 3)
if len(split) == 3:
project, dataset, table = split
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

is_resource_row: bool = not row["subresource"]
entity_urn = row["resource"]
entity_type = Urn.create_from_string(row["resource"]).get_type()
entity_type = Urn.from_string(row["resource"]).get_type()

term_associations: List[
GlossaryTermAssociationClass
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def collapse_name(name: str, collapse_urns: CollapseUrns) -> str:
def collapse_urn(urn: str, collapse_urns: CollapseUrns) -> str:
if len(collapse_urns.urns_suffix_regex) == 0:
return urn
urn_obj = DatasetUrn.create_from_string(urn)
urn_obj = DatasetUrn.from_string(urn)
name = collapse_name(name=urn_obj.get_dataset_name(), collapse_urns=collapse_urns)
data_platform_urn = urn_obj.get_data_platform_urn()
return str(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def __init__(
def delete_entity(self, urn: str) -> None:
assert self.ctx.graph

entity_urn = Urn.create_from_string(urn)
entity_urn = Urn.from_string(urn)
self.report.num_soft_deleted_entity_removed += 1
self.report.num_soft_deleted_entity_removed_by_type[entity_urn.entity_type] = (
self.report.num_soft_deleted_entity_removed_by_type.get(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def handle_end_of_stream(
logger.debug("Generating tags")

for tag_association in self.processed_tags.values():
tag_urn = TagUrn.create_from_string(tag_association.tag)
tag_urn = TagUrn.from_string(tag_association.tag)
mcps.append(
MetadataChangeProposalWrapper(
entityUrn=tag_urn.urn(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def transform(
)
if transformed_aspect:
# for end of stream records, we modify the workunit-id
structured_urn = Urn.create_from_string(urn)
structured_urn = Urn.from_string(urn)
simple_name = "-".join(structured_urn.get_entity_id())
record_metadata = envelope.metadata.copy()
record_metadata.update(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def get_entity_name(assertion: BaseEntityAssertion) -> Tuple[str, str, str]:
if qualified_name is not None:
parts = qualified_name.split(".")
else:
urn_id = Urn.create_from_string(assertion.entity).entity_ids[1]
urn_id = Urn.from_string(assertion.entity).entity_ids[1]
parts = urn_id.split(".")
if len(parts) > 3:
parts = parts[-3:]
Expand Down
29 changes: 12 additions & 17 deletions metadata-ingestion/src/datahub/lite/duckdb_lite.py
Original file line number Diff line number Diff line change
Expand Up @@ -609,15 +609,15 @@ def get_typed_aspect(
aspect_map, DataPlatformInstanceClass
) # type: ignore

needs_platform = Urn.create_from_string(entity_urn).get_type() in [
needs_platform = Urn.from_string(entity_urn).get_type() in [
"dataset",
"container",
"chart",
"dashboard",
"dataFlow",
"dataJob",
]
entity_urn_parsed = Urn.create_from_string(entity_urn)
entity_urn_parsed = Urn.from_string(entity_urn)
if entity_urn_parsed.get_type() in ["dataFlow", "dataJob"]:
self.add_edge(
entity_urn,
Expand All @@ -630,15 +630,12 @@ def get_typed_aspect(
# this is a top-level entity
if not dpi:
logger.debug(f"No data platform instance for {entity_urn}")
maybe_parent_urn = Urn.create_from_string(entity_urn).get_entity_id()[0]
maybe_parent_urn = Urn.from_string(entity_urn).get_entity_id()[0]
needs_dpi = False
if maybe_parent_urn.startswith(Urn.URN_PREFIX):
parent_urn = maybe_parent_urn
if (
Urn.create_from_string(maybe_parent_urn).get_type()
== "dataPlatform"
):
data_platform_urn = DataPlatformUrn.create_from_string(
if Urn.from_string(maybe_parent_urn).get_type() == "dataPlatform":
data_platform_urn = DataPlatformUrn.from_string(
maybe_parent_urn
)
needs_dpi = True
Expand All @@ -660,7 +657,7 @@ def get_typed_aspect(
logger.error(f"Failed to generate edges entity {entity_urn}", e)
parent_urn = str(data_platform_instance_urn)
else:
data_platform_urn = DataPlatformUrn.create_from_string(dpi.platform)
data_platform_urn = DataPlatformUrn.from_string(dpi.platform)
data_platform_instance = dpi.instance or "default"
data_platform_instance_urn = Urn(
entity_type="dataPlatformInstance",
Expand All @@ -673,9 +670,7 @@ def get_typed_aspect(
parent_urn = "__root__"

types = (
subtypes.typeNames
if subtypes
else [Urn.create_from_string(entity_urn).get_type()]
subtypes.typeNames if subtypes else [Urn.from_string(entity_urn).get_type()]
)
for t in types:
type_urn = Urn(entity_type="systemNode", entity_id=[parent_urn, t])
Expand All @@ -686,7 +681,7 @@ def get_typed_aspect(
def _create_edges_from_data_platform_instance(
self, data_platform_instance_urn: Urn
) -> None:
data_platform_urn = DataPlatformUrn.create_from_string(
data_platform_urn = DataPlatformUrn.from_string(
data_platform_instance_urn.get_entity_id()[0]
)
data_platform_instances_urn = Urn(
Expand Down Expand Up @@ -735,7 +730,7 @@ def post_update_hook(
if isinstance(aspect, DatasetPropertiesClass):
dp: DatasetPropertiesClass = aspect
if dp.name:
specific_urn = DatasetUrn.create_from_string(entity_urn)
specific_urn = DatasetUrn.from_string(entity_urn)
if (
specific_urn.get_data_platform_urn().get_entity_id_as_string()
== "looker"
Expand All @@ -755,23 +750,23 @@ def post_update_hook(
self.add_edge(entity_urn, "name", cp.name, remove_existing=True)
elif isinstance(aspect, DataPlatformInstanceClass):
dpi: DataPlatformInstanceClass = aspect
data_platform_urn = DataPlatformUrn.create_from_string(dpi.platform)
data_platform_urn = DataPlatformUrn.from_string(dpi.platform)
data_platform_instance = dpi.instance or "default"
data_platform_instance_urn = Urn(
entity_type="dataPlatformInstance",
entity_id=[str(data_platform_urn), data_platform_instance],
)
self._create_edges_from_data_platform_instance(data_platform_instance_urn)
elif isinstance(aspect, ChartInfoClass):
urn = Urn.create_from_string(entity_urn)
urn = Urn.from_string(entity_urn)
self.add_edge(
entity_urn,
"name",
aspect.title + f" ({urn.get_entity_id()[-1]})",
remove_existing=True,
)
elif isinstance(aspect, DashboardInfoClass):
urn = Urn.create_from_string(entity_urn)
urn = Urn.from_string(entity_urn)
self.add_edge(
entity_urn,
"name",
Expand Down
39 changes: 0 additions & 39 deletions metadata-ingestion/src/datahub/specific/chart.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import time
from typing import Dict, List, Optional, Union

from datahub.emitter.mcp_patch_builder import MetadataPatchProposal
from datahub.metadata.schema_classes import (
AccessLevelClass,
AuditStampClass,
ChangeAuditStampsClass,
ChartInfoClass as ChartInfo,
ChartTypeClass,
Expand Down Expand Up @@ -47,43 +45,6 @@ def __init__(
)
self.ownership_patch_helper = OwnershipPatchHelper(self)

def _mint_auditstamp(self, message: Optional[str] = None) -> AuditStampClass:
"""
Creates an AuditStampClass instance with the current timestamp and other default values.
Args:
message: The message associated with the audit stamp (optional).
Returns:
An instance of AuditStampClass.
"""
return AuditStampClass(
time=int(time.time() * 1000.0),
actor="urn:li:corpuser:datahub",
message=message,
)

def _ensure_urn_type(
self, entity_type: str, edges: List[Edge], context: str
) -> None:
"""
Ensures that the destination URNs in the given edges have the specified entity type.
Args:
entity_type: The entity type to check against.
edges: A list of Edge objects.
context: The context or description of the operation.
Raises:
ValueError: If any of the destination URNs is not of the specified entity type.
"""
for e in edges:
urn = Urn.create_from_string(e.destinationUrn)
if not urn.get_type() == entity_type:
raise ValueError(
f"{context}: {e.destinationUrn} is not of type {entity_type}"
)

def add_owner(self, owner: Owner) -> "ChartPatchBuilder":
"""
Adds an owner to the ChartPatchBuilder.
Expand Down
39 changes: 0 additions & 39 deletions metadata-ingestion/src/datahub/specific/dashboard.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
import time
from typing import Dict, List, Optional, Union

from datahub.emitter.mcp_patch_builder import MetadataPatchProposal
from datahub.metadata.schema_classes import (
AccessLevelClass,
AuditStampClass,
ChangeAuditStampsClass,
DashboardInfoClass as DashboardInfo,
EdgeClass as Edge,
Expand Down Expand Up @@ -46,43 +44,6 @@ def __init__(
)
self.ownership_patch_helper = OwnershipPatchHelper(self)

def _mint_auditstamp(self, message: Optional[str] = None) -> AuditStampClass:
"""
Creates an AuditStampClass instance with the current timestamp and other default values.
Args:
message: The message associated with the audit stamp (optional).
Returns:
An instance of AuditStampClass.
"""
return AuditStampClass(
time=int(time.time() * 1000.0),
actor="urn:li:corpuser:datahub",
message=message,
)

def _ensure_urn_type(
self, entity_type: str, edges: List[Edge], context: str
) -> None:
"""
Ensures that the destination URNs in the given edges have the specified entity type.
Args:
entity_type: The entity type to check against.
edges: A list of Edge objects.
context: The context or description of the operation.
Raises:
ValueError: If any of the destination URNs is not of the specified entity type.
"""
for e in edges:
urn = Urn.create_from_string(e.destinationUrn)
if not urn.get_type() == entity_type:
raise ValueError(
f"{context}: {e.destinationUrn} is not of type {entity_type}"
)

def add_owner(self, owner: Owner) -> "DashboardPatchBuilder":
"""
Adds an owner to the DashboardPatchBuilder.
Expand Down
Loading

0 comments on commit ce6474d

Please sign in to comment.