From c89f1e53805ced45fe53244daae355a86454bcf0 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 30 Dec 2024 20:57:20 -0500 Subject: [PATCH 01/11] replace all patch strings with paths for automatic quoting --- .../src/datahub/emitter/mcp_patch_builder.py | 21 ++++---- .../src/datahub/specific/chart.py | 34 ++++++------ .../src/datahub/specific/custom_properties.py | 4 +- .../src/datahub/specific/dashboard.py | 40 +++++++------- .../src/datahub/specific/datajob.py | 40 +++++++------- .../src/datahub/specific/dataproduct.py | 22 ++++---- .../src/datahub/specific/dataset.py | 53 +++++++++++-------- .../src/datahub/specific/form.py | 20 +++---- .../src/datahub/specific/ownership.py | 6 +-- .../datahub/specific/structured_property.py | 18 +++---- 10 files changed, 135 insertions(+), 123 deletions(-) diff --git a/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py b/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py index 1ed8ce1d5a6158..c6d6a2c95b6643 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py @@ -2,7 +2,9 @@ import time from collections import defaultdict from dataclasses import dataclass -from typing import Any, Dict, List, Optional, Sequence, Union +from typing import Any, Dict, List, Optional, Tuple, Union + +from typing_extensions import LiteralString from datahub.emitter.aspect import JSON_PATCH_CONTENT_TYPE from datahub.emitter.serialization_helper import pre_json_transform @@ -28,16 +30,20 @@ def _recursive_to_obj(obj: Any) -> Any: return obj +PatchPath = Tuple[Union[LiteralString, Urn], ...] + + @dataclass class _Patch: op: str # one of ['add', 'remove', 'replace']; we don't support move, copy or test - path: str + path: PatchPath value: Any def to_obj(self) -> Dict: + quoted_path = "/" + "/".join(MetadataPatchProposal.quote(p) for p in self.path) return { "op": self.op, - "path": self.path, + "path": quoted_path, "value": _recursive_to_obj(self.value), } @@ -63,15 +69,12 @@ def __init__( # Json Patch quoting based on https://jsonpatch.com/#json-pointer @classmethod - def quote(cls, value: str) -> str: - return value.replace("~", "~0").replace("/", "~1") + def quote(cls, value: Union[str, Urn]) -> str: + return str(value).replace("~", "~0").replace("/", "~1") def _add_patch( - self, aspect_name: str, op: str, path: Union[str, Sequence[str]], value: Any + self, aspect_name: str, op: str, path: PatchPath, value: Any ) -> None: - if not isinstance(path, str): - path = "/" + "/".join(self.quote(p) for p in path) - # TODO: Validate that aspectName is a valid aspect for this entityType self.patches[aspect_name].append(_Patch(op, path, value)) diff --git a/metadata-ingestion/src/datahub/specific/chart.py b/metadata-ingestion/src/datahub/specific/chart.py index 104a7c21a07e2f..cc751c190e410d 100644 --- a/metadata-ingestion/src/datahub/specific/chart.py +++ b/metadata-ingestion/src/datahub/specific/chart.py @@ -120,7 +120,7 @@ def add_input_edge(self, input: Union[Edge, Urn, str]) -> "ChartPatchBuilder": self._add_patch( ChartInfo.ASPECT_NAME, "add", - path=f"/inputEdges/{self.quote(input_urn)}", + path=("inputEdges", input_urn), value=input_urn, ) return self @@ -138,7 +138,7 @@ def remove_input_edge(self, input: Union[str, Urn]) -> "ChartPatchBuilder": self._add_patch( ChartInfo.ASPECT_NAME, "remove", - path=f"/inputEdges/{self.quote(str(input))}", + path=("inputEdges", str(input)), value={}, ) return self @@ -159,7 +159,7 @@ def set_input_edges(self, inputs: List[Edge]) -> "ChartPatchBuilder": self._add_patch( ChartInfo.ASPECT_NAME, "add", - path="/inputEdges", + path=("inputEdges",), value=inputs, ) return self @@ -175,7 +175,7 @@ def add_tag(self, tag: Tag) -> "ChartPatchBuilder": The ChartPatchBuilder instance. """ self._add_patch( - GlobalTags.ASPECT_NAME, "add", path=f"/tags/{tag.tag}", value=tag + GlobalTags.ASPECT_NAME, "add", path=("tags", tag.tag), value=tag ) return self @@ -191,7 +191,7 @@ def remove_tag(self, tag: Union[str, Urn]) -> "ChartPatchBuilder": """ if isinstance(tag, str) and not tag.startswith("urn:li:tag:"): tag = TagUrn.create_from_id(tag) - self._add_patch(GlobalTags.ASPECT_NAME, "remove", path=f"/tags/{tag}", value={}) + self._add_patch(GlobalTags.ASPECT_NAME, "remove", path=("tags", tag), value={}) return self def add_term(self, term: Term) -> "ChartPatchBuilder": @@ -205,7 +205,7 @@ def add_term(self, term: Term) -> "ChartPatchBuilder": The ChartPatchBuilder instance. """ self._add_patch( - GlossaryTerms.ASPECT_NAME, "add", path=f"/terms/{term.urn}", value=term + GlossaryTerms.ASPECT_NAME, "add", path=("terms", term.urn), value=term ) return self @@ -222,7 +222,7 @@ def remove_term(self, term: Union[str, Urn]) -> "ChartPatchBuilder": if isinstance(term, str) and not term.startswith("urn:li:glossaryTerm:"): term = "urn:li:glossaryTerm:" + term self._add_patch( - GlossaryTerms.ASPECT_NAME, "remove", path=f"/terms/{term}", value={} + GlossaryTerms.ASPECT_NAME, "remove", path=("terms", term), value={} ) return self @@ -244,7 +244,7 @@ def set_custom_properties( self._add_patch( ChartInfo.ASPECT_NAME, "add", - path="/customProperties", + path=("customProperties",), value=custom_properties, ) return self @@ -281,7 +281,7 @@ def set_title(self, title: str) -> "ChartPatchBuilder": self._add_patch( ChartInfo.ASPECT_NAME, "add", - path="/title", + path=("title",), value=title, ) @@ -292,7 +292,7 @@ def set_description(self, description: str) -> "ChartPatchBuilder": self._add_patch( ChartInfo.ASPECT_NAME, "add", - path="/description", + path=("description",), value=description, ) @@ -303,7 +303,7 @@ def set_last_refreshed(self, last_refreshed: Optional[int]) -> "ChartPatchBuilde self._add_patch( ChartInfo.ASPECT_NAME, "add", - path="/lastRefreshed", + path=("lastRefreshed",), value=last_refreshed, ) @@ -316,7 +316,7 @@ def set_last_modified( self._add_patch( ChartInfo.ASPECT_NAME, "add", - path="/lastModified", + path=("lastModified",), value=last_modified, ) @@ -327,7 +327,7 @@ def set_external_url(self, external_url: Optional[str]) -> "ChartPatchBuilder": self._add_patch( ChartInfo.ASPECT_NAME, "add", - path="/externalUrl", + path=("externalUrl",), value=external_url, ) return self @@ -337,7 +337,7 @@ def set_chart_url(self, dashboard_url: Optional[str]) -> "ChartPatchBuilder": self._add_patch( ChartInfo.ASPECT_NAME, "add", - path="/chartUrl", + path=("chartUrl",), value=dashboard_url, ) @@ -350,7 +350,7 @@ def set_type( self._add_patch( ChartInfo.ASPECT_NAME, "add", - path="/type", + path=("type",), value=type, ) @@ -363,7 +363,7 @@ def set_access( self._add_patch( ChartInfo.ASPECT_NAME, "add", - path="/access", + path=("access",), value=access, ) @@ -375,7 +375,7 @@ def add_inputs(self, input_urns: Optional[List[str]]) -> "ChartPatchBuilder": self._add_patch( aspect_name=ChartInfo.ASPECT_NAME, op="add", - path=f"/inputs/{urn}", + path=("inputs", urn), value=urn, ) diff --git a/metadata-ingestion/src/datahub/specific/custom_properties.py b/metadata-ingestion/src/datahub/specific/custom_properties.py index d399a448cc0c23..c1d46b440b39f1 100644 --- a/metadata-ingestion/src/datahub/specific/custom_properties.py +++ b/metadata-ingestion/src/datahub/specific/custom_properties.py @@ -22,7 +22,7 @@ def add_property(self, key: str, value: str) -> "CustomPropertiesPatchHelper": self._parent._add_patch( self.aspect_name, "add", - path=f"/{self.aspect_field}/{key}", + path=(self.aspect_field, key), value=value, ) return self @@ -31,7 +31,7 @@ def remove_property(self, key: str) -> "CustomPropertiesPatchHelper": self._parent._add_patch( self.aspect_name, "remove", - path=f"/{self.aspect_field}/{key}", + path=(self.aspect_field, key), value={}, ) return self diff --git a/metadata-ingestion/src/datahub/specific/dashboard.py b/metadata-ingestion/src/datahub/specific/dashboard.py index da5abbfd1dc129..8c36e56406a6ea 100644 --- a/metadata-ingestion/src/datahub/specific/dashboard.py +++ b/metadata-ingestion/src/datahub/specific/dashboard.py @@ -126,7 +126,7 @@ def add_dataset_edge( self._add_patch( DashboardInfo.ASPECT_NAME, "add", - path=f"/datasetEdges/{self.quote(dataset_urn)}", + path=("datasetEdges", dataset_urn), value=dataset_edge, ) return self @@ -144,7 +144,7 @@ def remove_dataset_edge(self, dataset: Union[str, Urn]) -> "DashboardPatchBuilde self._add_patch( DashboardInfo.ASPECT_NAME, "remove", - path=f"/datasetEdges/{dataset}", + path=("datasetEdges", dataset), value={}, ) return self @@ -169,7 +169,7 @@ def set_dataset_edges(self, datasets: List[Edge]) -> "DashboardPatchBuilder": self._add_patch( DashboardInfo.ASPECT_NAME, "add", - path="/datasetEdges", + path=("datasetEdges",), value=datasets, ) return self @@ -209,7 +209,7 @@ def add_chart_edge(self, chart: Union[Edge, Urn, str]) -> "DashboardPatchBuilder self._add_patch( DashboardInfo.ASPECT_NAME, "add", - path=f"/chartEdges/{self.quote(chart_urn)}", + path=("chartEdges", chart_urn), value=chart_edge, ) return self @@ -227,7 +227,7 @@ def remove_chart_edge(self, chart: Union[str, Urn]) -> "DashboardPatchBuilder": self._add_patch( DashboardInfo.ASPECT_NAME, "remove", - path=f"/chartEdges/{chart}", + path=("chartEdges", chart), value={}, ) return self @@ -252,7 +252,7 @@ def set_chart_edges(self, charts: List[Edge]) -> "DashboardPatchBuilder": self._add_patch( DashboardInfo.ASPECT_NAME, "add", - path="/chartEdges", + path=("chartEdges",), value=charts, ) return self @@ -268,7 +268,7 @@ def add_tag(self, tag: Tag) -> "DashboardPatchBuilder": The DashboardPatchBuilder instance. """ self._add_patch( - GlobalTags.ASPECT_NAME, "add", path=f"/tags/{tag.tag}", value=tag + GlobalTags.ASPECT_NAME, "add", path=("tags", tag.tag), value=tag ) return self @@ -284,7 +284,7 @@ def remove_tag(self, tag: Union[str, Urn]) -> "DashboardPatchBuilder": """ if isinstance(tag, str) and not tag.startswith("urn:li:tag:"): tag = TagUrn.create_from_id(tag) - self._add_patch(GlobalTags.ASPECT_NAME, "remove", path=f"/tags/{tag}", value={}) + self._add_patch(GlobalTags.ASPECT_NAME, "remove", path=("tags", tag), value={}) return self def add_term(self, term: Term) -> "DashboardPatchBuilder": @@ -298,7 +298,7 @@ def add_term(self, term: Term) -> "DashboardPatchBuilder": The DashboardPatchBuilder instance. """ self._add_patch( - GlossaryTerms.ASPECT_NAME, "add", path=f"/terms/{term.urn}", value=term + GlossaryTerms.ASPECT_NAME, "add", path=("terms", term.urn), value=term ) return self @@ -315,7 +315,7 @@ def remove_term(self, term: Union[str, Urn]) -> "DashboardPatchBuilder": if isinstance(term, str) and not term.startswith("urn:li:glossaryTerm:"): term = "urn:li:glossaryTerm:" + term self._add_patch( - GlossaryTerms.ASPECT_NAME, "remove", path=f"/terms/{term}", value={} + GlossaryTerms.ASPECT_NAME, "remove", path=("terms", term), value={} ) return self @@ -337,7 +337,7 @@ def set_custom_properties( self._add_patch( DashboardInfo.ASPECT_NAME, "add", - path="/customProperties", + path=("customProperties",), value=custom_properties, ) return self @@ -374,7 +374,7 @@ def set_title(self, title: str) -> "DashboardPatchBuilder": self._add_patch( DashboardInfo.ASPECT_NAME, "add", - path="/title", + path=("title",), value=title, ) @@ -385,7 +385,7 @@ def set_description(self, description: str) -> "DashboardPatchBuilder": self._add_patch( DashboardInfo.ASPECT_NAME, "add", - path="/description", + path=("description",), value=description, ) @@ -405,7 +405,7 @@ def set_external_url(self, external_url: Optional[str]) -> "DashboardPatchBuilde self._add_patch( DashboardInfo.ASPECT_NAME, "add", - path="/externalUrl", + path=("externalUrl",), value=external_url, ) return self @@ -416,7 +416,7 @@ def add_charts(self, chart_urns: Optional[List[str]]) -> "DashboardPatchBuilder" self._add_patch( aspect_name=DashboardInfo.ASPECT_NAME, op="add", - path=f"/charts/{urn}", + path=("charts", urn), value=urn, ) @@ -430,7 +430,7 @@ def add_datasets( self._add_patch( aspect_name=DashboardInfo.ASPECT_NAME, op="add", - path=f"/datasets/{urn}", + path=("datasets", urn), value=urn, ) @@ -443,7 +443,7 @@ def set_dashboard_url( self._add_patch( DashboardInfo.ASPECT_NAME, "add", - path="/dashboardUrl", + path=("dashboardUrl",), value=dashboard_url, ) @@ -456,7 +456,7 @@ def set_access( self._add_patch( DashboardInfo.ASPECT_NAME, "add", - path="/access", + path=("access",), value=access, ) @@ -469,7 +469,7 @@ def set_last_refreshed( self._add_patch( DashboardInfo.ASPECT_NAME, "add", - path="/lastRefreshed", + path=("lastRefreshed",), value=last_refreshed, ) @@ -482,7 +482,7 @@ def set_last_modified( self._add_patch( DashboardInfo.ASPECT_NAME, "add", - path="/lastModified", + path=("lastModified",), value=last_modified, ) diff --git a/metadata-ingestion/src/datahub/specific/datajob.py b/metadata-ingestion/src/datahub/specific/datajob.py index 6ff4741b09c26a..517aad684b656e 100644 --- a/metadata-ingestion/src/datahub/specific/datajob.py +++ b/metadata-ingestion/src/datahub/specific/datajob.py @@ -120,7 +120,7 @@ def add_input_datajob(self, input: Union[Edge, Urn, str]) -> "DataJobPatchBuilde self._add_patch( DataJobInputOutput.ASPECT_NAME, "add", - path=f"/inputDatajobEdges/{self.quote(input_urn)}", + path=("inputDatajobEdges", input_urn), value=input_edge, ) return self @@ -138,7 +138,7 @@ def remove_input_datajob(self, input: Union[str, Urn]) -> "DataJobPatchBuilder": self._add_patch( DataJobInputOutput.ASPECT_NAME, "remove", - path=f"/inputDatajobEdges/{input}", + path=("inputDatajobEdges", input), value={}, ) return self @@ -163,7 +163,7 @@ def set_input_datajobs(self, inputs: List[Edge]) -> "DataJobPatchBuilder": self._add_patch( DataJobInputOutput.ASPECT_NAME, "add", - path="/inputDatajobEdges", + path=("inputDatajobEdges",), value=inputs, ) return self @@ -201,7 +201,7 @@ def add_input_dataset(self, input: Union[Edge, Urn, str]) -> "DataJobPatchBuilde self._add_patch( DataJobInputOutput.ASPECT_NAME, "add", - path=f"/inputDatasetEdges/{self.quote(input_urn)}", + path=("inputDatasetEdges", input_urn), value=input_edge, ) return self @@ -219,7 +219,7 @@ def remove_input_dataset(self, input: Union[str, Urn]) -> "DataJobPatchBuilder": self._add_patch( DataJobInputOutput.ASPECT_NAME, "remove", - path=f"/inputDatasetEdges/{self.quote(str(input))}", + path=("inputDatasetEdges", input), value={}, ) return self @@ -244,7 +244,7 @@ def set_input_datasets(self, inputs: List[Edge]) -> "DataJobPatchBuilder": self._add_patch( DataJobInputOutput.ASPECT_NAME, "add", - path="/inputDatasetEdges", + path=("inputDatasetEdges",), value=inputs, ) return self @@ -284,7 +284,7 @@ def add_output_dataset( self._add_patch( DataJobInputOutput.ASPECT_NAME, "add", - path=f"/outputDatasetEdges/{self.quote(output_urn)}", + path=("outputDatasetEdges", output_urn), value=output_edge, ) return self @@ -302,7 +302,7 @@ def remove_output_dataset(self, output: Union[str, Urn]) -> "DataJobPatchBuilder self._add_patch( DataJobInputOutput.ASPECT_NAME, "remove", - path=f"/outputDatasetEdges/{self.quote(str(output))}", + path=("outputDatasetEdges", output), value={}, ) return self @@ -327,7 +327,7 @@ def set_output_datasets(self, outputs: List[Edge]) -> "DataJobPatchBuilder": self._add_patch( DataJobInputOutput.ASPECT_NAME, "add", - path="/outputDatasetEdges", + path=("outputDatasetEdges",), value=outputs, ) return self @@ -351,7 +351,7 @@ def add_input_dataset_field(self, input: Union[Urn, str]) -> "DataJobPatchBuilde self._add_patch( DataJobInputOutput.ASPECT_NAME, "add", - path=f"/inputDatasetFields/{self.quote(input_urn)}", + path=("inputDatasetFields", input_urn), value={}, ) return self @@ -372,7 +372,7 @@ def remove_input_dataset_field( self._add_patch( DataJobInputOutput.ASPECT_NAME, "remove", - path=f"/inputDatasetFields/{self.quote(input_urn)}", + path=("inputDatasetFields", input_urn), value={}, ) return self @@ -397,7 +397,7 @@ def set_input_dataset_fields(self, inputs: List[Edge]) -> "DataJobPatchBuilder": self._add_patch( DataJobInputOutput.ASPECT_NAME, "add", - path="/inputDatasetFields", + path=("inputDatasetFields",), value=inputs, ) return self @@ -423,7 +423,7 @@ def add_output_dataset_field( self._add_patch( DataJobInputOutput.ASPECT_NAME, "add", - path=f"/outputDatasetFields/{self.quote(output_urn)}", + path=("outputDatasetFields", output_urn), value={}, ) return self @@ -444,7 +444,7 @@ def remove_output_dataset_field( self._add_patch( DataJobInputOutput.ASPECT_NAME, "remove", - path=f"/outputDatasetFields/{self.quote(output_urn)}", + path=("outputDatasetFields", output_urn), value={}, ) return self @@ -469,7 +469,7 @@ def set_output_dataset_fields(self, outputs: List[Edge]) -> "DataJobPatchBuilder self._add_patch( DataJobInputOutput.ASPECT_NAME, "add", - path="/outputDatasetFields", + path=("outputDatasetFields",), value=outputs, ) return self @@ -485,7 +485,7 @@ def add_tag(self, tag: Tag) -> "DataJobPatchBuilder": The DataJobPatchBuilder instance. """ self._add_patch( - GlobalTags.ASPECT_NAME, "add", path=f"/tags/{tag.tag}", value=tag + GlobalTags.ASPECT_NAME, "add", path=("tags", tag.tag), value=tag ) return self @@ -501,7 +501,7 @@ def remove_tag(self, tag: Union[str, Urn]) -> "DataJobPatchBuilder": """ if isinstance(tag, str) and not tag.startswith("urn:li:tag:"): tag = TagUrn.create_from_id(tag) - self._add_patch(GlobalTags.ASPECT_NAME, "remove", path=f"/tags/{tag}", value={}) + self._add_patch(GlobalTags.ASPECT_NAME, "remove", path=("tags", tag), value={}) return self def add_term(self, term: Term) -> "DataJobPatchBuilder": @@ -515,7 +515,7 @@ def add_term(self, term: Term) -> "DataJobPatchBuilder": The DataJobPatchBuilder instance. """ self._add_patch( - GlossaryTerms.ASPECT_NAME, "add", path=f"/terms/{term.urn}", value=term + GlossaryTerms.ASPECT_NAME, "add", path=("terms", term.urn), value=term ) return self @@ -532,7 +532,7 @@ def remove_term(self, term: Union[str, Urn]) -> "DataJobPatchBuilder": if isinstance(term, str) and not term.startswith("urn:li:glossaryTerm:"): term = "urn:li:glossaryTerm:" + term self._add_patch( - GlossaryTerms.ASPECT_NAME, "remove", path=f"/terms/{term}", value={} + GlossaryTerms.ASPECT_NAME, "remove", path=("terms", term), value={} ) return self @@ -554,7 +554,7 @@ def set_custom_properties( self._add_patch( DataJobInfo.ASPECT_NAME, "add", - path="/customProperties", + path=("customProperties",), value=custom_properties, ) return self diff --git a/metadata-ingestion/src/datahub/specific/dataproduct.py b/metadata-ingestion/src/datahub/specific/dataproduct.py index f9830a4b23df05..880fb423a3184e 100644 --- a/metadata-ingestion/src/datahub/specific/dataproduct.py +++ b/metadata-ingestion/src/datahub/specific/dataproduct.py @@ -55,19 +55,19 @@ def set_owners(self, owners: List[Owner]) -> "DataProductPatchBuilder": def add_tag(self, tag: Tag) -> "DataProductPatchBuilder": self._add_patch( - GlobalTags.ASPECT_NAME, "add", path=f"/tags/{tag.tag}", value=tag + GlobalTags.ASPECT_NAME, "add", path=("tags", tag.tag), value=tag ) return self def remove_tag(self, tag: Union[str, Urn]) -> "DataProductPatchBuilder": if isinstance(tag, str) and not tag.startswith("urn:li:tag:"): tag = TagUrn.create_from_id(tag) - self._add_patch(GlobalTags.ASPECT_NAME, "remove", path=f"/tags/{tag}", value={}) + self._add_patch(GlobalTags.ASPECT_NAME, "remove", path=("tags", tag), value={}) return self def add_term(self, term: Term) -> "DataProductPatchBuilder": self._add_patch( - GlossaryTerms.ASPECT_NAME, "add", path=f"/terms/{term.urn}", value=term + GlossaryTerms.ASPECT_NAME, "add", path=("terms", term.urn), value=term ) return self @@ -75,7 +75,7 @@ def remove_term(self, term: Union[str, Urn]) -> "DataProductPatchBuilder": if isinstance(term, str) and not term.startswith("urn:li:glossaryTerm:"): term = "urn:li:glossaryTerm:" + term self._add_patch( - GlossaryTerms.ASPECT_NAME, "remove", path=f"/terms/{term}", value={} + GlossaryTerms.ASPECT_NAME, "remove", path=("terms", term), value={} ) return self @@ -83,7 +83,7 @@ def set_name(self, name: str) -> "DataProductPatchBuilder": self._add_patch( DataProductProperties.ASPECT_NAME, "add", - path="/name", + path=("name",), value=name, ) return self @@ -92,7 +92,7 @@ def set_description(self, description: str) -> "DataProductPatchBuilder": self._add_patch( DataProductProperties.ASPECT_NAME, "add", - path="/description", + path=("description",), value=description, ) return self @@ -103,7 +103,7 @@ def set_custom_properties( self._add_patch( DataProductProperties.ASPECT_NAME, "add", - path="/customProperties", + path=("customProperties",), value=custom_properties, ) return self @@ -122,7 +122,7 @@ def set_assets( self._add_patch( DataProductProperties.ASPECT_NAME, "add", - path="/assets", + path=("assets",), value=assets, ) return self @@ -131,7 +131,7 @@ def add_asset(self, asset_urn: str) -> "DataProductPatchBuilder": self._add_patch( DataProductProperties.ASPECT_NAME, "add", - path=f"/assets/{self.quote(asset_urn)}", + path=("assets", asset_urn), value=DataProductAssociation(destinationUrn=asset_urn), ) return self @@ -140,7 +140,7 @@ def remove_asset(self, asset_urn: str) -> "DataProductPatchBuilder": self._add_patch( DataProductProperties.ASPECT_NAME, "remove", - path=f"/assets/{self.quote(asset_urn)}", + path=("assets", asset_urn), value={}, ) return self @@ -149,7 +149,7 @@ def set_external_url(self, external_url: str) -> "DataProductPatchBuilder": self._add_patch( DataProductProperties.ASPECT_NAME, "add", - path="/externalUrl", + path=("externalUrl",), value=external_url, ) return self diff --git a/metadata-ingestion/src/datahub/specific/dataset.py b/metadata-ingestion/src/datahub/specific/dataset.py index b171dc4cc2939f..d5a1126065ad5e 100644 --- a/metadata-ingestion/src/datahub/specific/dataset.py +++ b/metadata-ingestion/src/datahub/specific/dataset.py @@ -1,6 +1,6 @@ from typing import Dict, Generic, List, Optional, Tuple, TypeVar, Union -from datahub.emitter.mcp_patch_builder import MetadataPatchProposal +from datahub.emitter.mcp_patch_builder import MetadataPatchProposal, PatchPath from datahub.metadata.com.linkedin.pegasus2avro.common import TimeStamp from datahub.metadata.schema_classes import ( DatasetPropertiesClass as DatasetProperties, @@ -48,7 +48,7 @@ def add_tag(self, tag: Tag) -> "FieldPatchHelper": self._parent._add_patch( self.aspect_name, "add", - path=f"/{self.aspect_field}/{self.field_path}/globalTags/tags/{tag.tag}", + path=(self.aspect_field, self.field_path, "globalTags", "tags", tag.tag), value=tag, ) return self @@ -59,7 +59,7 @@ def remove_tag(self, tag: Union[str, Urn]) -> "FieldPatchHelper": self._parent._add_patch( self.aspect_name, "remove", - path=f"/{self.aspect_field}/{self.field_path}/globalTags/tags/{tag}", + path=(self.aspect_field, self.field_path, "globalTags", "tags", tag), value={}, ) return self @@ -68,7 +68,13 @@ def add_term(self, term: Term) -> "FieldPatchHelper": self._parent._add_patch( self.aspect_name, "add", - path=f"/{self.aspect_field}/{self.field_path}/glossaryTerms/terms/{term.urn}", + path=( + self.aspect_field, + self.field_path, + "glossaryTerms", + "terms", + term.urn, + ), value=term, ) return self @@ -79,7 +85,7 @@ def remove_term(self, term: Union[str, Urn]) -> "FieldPatchHelper": self._parent._add_patch( self.aspect_name, "remove", - path=f"/{self.aspect_field}/{self.field_path}/glossaryTerms/terms/{term}", + path=(self.aspect_field, self.field_path, "glossaryTerms", "terms", term), value={}, ) return self @@ -125,7 +131,7 @@ def add_upstream_lineage(self, upstream: Upstream) -> "DatasetPatchBuilder": self._add_patch( UpstreamLineage.ASPECT_NAME, "add", - path=f"/upstreams/{self.quote(upstream.dataset)}", + path=("upstreams", upstream.dataset), value=upstream, ) return self @@ -136,14 +142,14 @@ def remove_upstream_lineage( self._add_patch( UpstreamLineage.ASPECT_NAME, "remove", - path=f"/upstreams/{dataset}", + path=("upstreams", dataset), value={}, ) return self def set_upstream_lineages(self, upstreams: List[Upstream]) -> "DatasetPatchBuilder": self._add_patch( - UpstreamLineage.ASPECT_NAME, "add", path="/upstreams", value=upstreams + UpstreamLineage.ASPECT_NAME, "add", path=("upstreams",), value=upstreams ) return self @@ -181,10 +187,13 @@ def get_fine_grained_key( @classmethod def quote_fine_grained_path( cls, transform_op: str, downstream_urn: str, query_id: str, upstream_urn: str - ) -> str: + ) -> PatchPath: return ( - f"/fineGrainedLineages/{cls.quote(transform_op)}/" - f"{cls.quote(downstream_urn)}/{cls.quote(query_id)}/{cls.quote(upstream_urn)}" + "fineGrainedLineages", + transform_op, + downstream_urn, + query_id, + upstream_urn, ) def remove_fine_grained_upstream_lineage( @@ -212,26 +221,26 @@ def set_fine_grained_upstream_lineages( self._add_patch( UpstreamLineage.ASPECT_NAME, "add", - path="/fineGrainedLineages", + path=("fineGrainedLineages",), value=fine_grained_lineages, ) return self def add_tag(self, tag: Tag) -> "DatasetPatchBuilder": self._add_patch( - GlobalTags.ASPECT_NAME, "add", path=f"/tags/{tag.tag}", value=tag + GlobalTags.ASPECT_NAME, "add", path=("tags", tag.tag), value=tag ) return self def remove_tag(self, tag: Union[str, Urn]) -> "DatasetPatchBuilder": if isinstance(tag, str) and not tag.startswith("urn:li:tag:"): tag = TagUrn.create_from_id(tag) - self._add_patch(GlobalTags.ASPECT_NAME, "remove", path=f"/tags/{tag}", value={}) + self._add_patch(GlobalTags.ASPECT_NAME, "remove", path=("tags", tag), value={}) return self def add_term(self, term: Term) -> "DatasetPatchBuilder": self._add_patch( - GlossaryTerms.ASPECT_NAME, "add", path=f"/terms/{term.urn}", value=term + GlossaryTerms.ASPECT_NAME, "add", path=("terms", term.urn), value=term ) return self @@ -239,7 +248,7 @@ def remove_term(self, term: Union[str, Urn]) -> "DatasetPatchBuilder": if isinstance(term, str) and not term.startswith("urn:li:glossaryTerm:"): term = "urn:li:glossaryTerm:" + term self._add_patch( - GlossaryTerms.ASPECT_NAME, "remove", path=f"/terms/{term}", value={} + GlossaryTerms.ASPECT_NAME, "remove", path=("terms", term), value={} ) return self @@ -269,7 +278,7 @@ def set_description( else EditableDatasetProperties.ASPECT_NAME ), "add", - path="/description", + path=("description",), value=description, ) return self @@ -280,7 +289,7 @@ def set_custom_properties( self._add_patch( DatasetProperties.ASPECT_NAME, "add", - path="/customProperties", + path=("customProperties",), value=custom_properties, ) return self @@ -308,7 +317,7 @@ def set_display_name( self._add_patch( DatasetProperties.ASPECT_NAME, "add", - path="/name", + path=("name",), value=display_name, ) return self @@ -320,7 +329,7 @@ def set_qualified_name( self._add_patch( DatasetProperties.ASPECT_NAME, "add", - path="/qualifiedName", + path=("qualifiedName",), value=qualified_name, ) return self @@ -332,7 +341,7 @@ def set_created( self._add_patch( DatasetProperties.ASPECT_NAME, "add", - path="/created", + path=("created",), value=timestamp, ) return self @@ -344,7 +353,7 @@ def set_last_modified( self._add_patch( DatasetProperties.ASPECT_NAME, "add", - path="/lastModified", + path=("lastModified",), value=timestamp, ) return self diff --git a/metadata-ingestion/src/datahub/specific/form.py b/metadata-ingestion/src/datahub/specific/form.py index 78182c202f7162..04fdc77e8e65c3 100644 --- a/metadata-ingestion/src/datahub/specific/form.py +++ b/metadata-ingestion/src/datahub/specific/form.py @@ -47,7 +47,7 @@ def set_name(self, name: Optional[str] = None) -> "FormPatchBuilder": self._add_patch( FormInfo.ASPECT_NAME, "add", - path="/name", + path=("name",), value=name, ) return self @@ -57,7 +57,7 @@ def set_description(self, description: Optional[str] = None) -> "FormPatchBuilde self._add_patch( FormInfo.ASPECT_NAME, "add", - path="/description", + path=("description",), value=description, ) return self @@ -67,7 +67,7 @@ def set_type(self, type: Optional[str] = None) -> "FormPatchBuilder": self._add_patch( FormInfo.ASPECT_NAME, "add", - path="/type", + path=("type",), value=type, ) return self @@ -76,7 +76,7 @@ def add_prompt(self, prompt: FormPromptClass) -> "FormPatchBuilder": self._add_patch( FormInfo.ASPECT_NAME, "add", - path=f"/prompts/{self.quote(prompt.id)}", + path=("prompts", prompt.id), value=prompt, ) return self @@ -90,7 +90,7 @@ def remove_prompt(self, prompt_id: str) -> "FormPatchBuilder": self._add_patch( FormInfo.ASPECT_NAME, "remove", - path=f"/prompts/{self.quote(prompt_id)}", + path=("prompts", prompt_id), value=prompt_id, ) return self @@ -104,7 +104,7 @@ def set_ownership_form(self, is_ownership: bool) -> "FormPatchBuilder": self._add_patch( FormInfo.ASPECT_NAME, "add", - path="/actors/owners", + path=("actors", "owners"), value=is_ownership, ) return self @@ -113,7 +113,7 @@ def add_assigned_user(self, user_urn: Union[str, Urn]) -> "FormPatchBuilder": self._add_patch( FormInfo.ASPECT_NAME, "add", - path=f"/actors/users/{self.quote(str(user_urn))}", + path=("actors", "users", user_urn), value=user_urn, ) return self @@ -122,7 +122,7 @@ def remove_assigned_user(self, user_urn: Union[str, Urn]) -> "FormPatchBuilder": self._add_patch( FormInfo.ASPECT_NAME, "remove", - path=f"/actors/users/{self.quote(str(user_urn))}", + path=("actors", "users", user_urn), value=user_urn, ) return self @@ -131,7 +131,7 @@ def add_assigned_group(self, group_urn: Union[str, Urn]) -> "FormPatchBuilder": self._add_patch( FormInfo.ASPECT_NAME, "add", - path=f"/actors/groups/{self.quote(str(group_urn))}", + path=("actors", "groups", group_urn), value=group_urn, ) return self @@ -140,7 +140,7 @@ def remove_assigned_group(self, group_urn: Union[str, Urn]) -> "FormPatchBuilder self._add_patch( FormInfo.ASPECT_NAME, "remove", - path=f"/actors/groups/{self.quote(str(group_urn))}", + path=("actors", "groups", group_urn), value=group_urn, ) return self diff --git a/metadata-ingestion/src/datahub/specific/ownership.py b/metadata-ingestion/src/datahub/specific/ownership.py index b377a8814f38a0..abeb8ea061ba94 100644 --- a/metadata-ingestion/src/datahub/specific/ownership.py +++ b/metadata-ingestion/src/datahub/specific/ownership.py @@ -22,7 +22,7 @@ def add_owner(self, owner: OwnerClass) -> "OwnershipPatchHelper": self._parent._add_patch( OwnershipClass.ASPECT_NAME, "add", - path=f"/owners/{owner.owner}/{owner.type}", + path=("owners", owner.owner, str(owner.type)), value=owner, ) return self @@ -36,13 +36,13 @@ def remove_owner( self._parent._add_patch( OwnershipClass.ASPECT_NAME, "remove", - path=f"/owners/{owner}" + (f"/{owner_type}" if owner_type else ""), + path=("owners", owner) + ((str(owner_type),) if owner_type else ()), value=owner, ) return self def set_owners(self, owners: List[OwnerClass]) -> "OwnershipPatchHelper": self._parent._add_patch( - OwnershipClass.ASPECT_NAME, "add", path="/owners", value=owners + OwnershipClass.ASPECT_NAME, "add", path=("owners",), value=owners ) return self diff --git a/metadata-ingestion/src/datahub/specific/structured_property.py b/metadata-ingestion/src/datahub/specific/structured_property.py index 50f1f079c2aa72..bcae174ed3c4f4 100644 --- a/metadata-ingestion/src/datahub/specific/structured_property.py +++ b/metadata-ingestion/src/datahub/specific/structured_property.py @@ -29,7 +29,7 @@ def set_qualified_name( self._add_patch( StructuredPropertyDefinition.ASPECT_NAME, "add", - path="/qualifiedName", + path=("qualifiedName",), value=qualified_name, ) return self @@ -41,7 +41,7 @@ def set_display_name( self._add_patch( StructuredPropertyDefinition.ASPECT_NAME, "add", - path="/displayName", + path=("displayName",), value=display_name, ) return self @@ -53,7 +53,7 @@ def set_value_type( self._add_patch( StructuredPropertyDefinition.ASPECT_NAME, "add", - path="/valueType", + path=("valueType",), value=value_type, ) return self @@ -66,7 +66,7 @@ def set_type_qualifier( self._add_patch( StructuredPropertyDefinition.ASPECT_NAME, "add", - path="/typeQualifier", + path=("typeQualifier",), value=type_qualifier, ) return self @@ -78,7 +78,7 @@ def add_allowed_value( self._add_patch( StructuredPropertyDefinition.ASPECT_NAME, "add", - path=f"/allowedValues/{str(allowed_value.get('value'))}", + path=("allowedValues", str(allowed_value.get("value"))), value=allowed_value, ) return self @@ -87,7 +87,7 @@ def set_cardinality(self, cardinality: str) -> "StructuredPropertyPatchBuilder": self._add_patch( StructuredPropertyDefinition.ASPECT_NAME, "add", - path="/cardinality", + path=("cardinality",), value=cardinality, ) return self @@ -98,7 +98,7 @@ def add_entity_type( self._add_patch( StructuredPropertyDefinition.ASPECT_NAME, "add", - path=f"/entityTypes/{self.quote(str(entity_type))}", + path=("entityTypes", str(entity_type)), value=entity_type, ) return self @@ -110,7 +110,7 @@ def set_description( self._add_patch( StructuredPropertyDefinition.ASPECT_NAME, "add", - path="/description", + path=("description",), value=description, ) return self @@ -119,7 +119,7 @@ def set_immutable(self, immutable: bool) -> "StructuredPropertyPatchBuilder": self._add_patch( StructuredPropertyDefinition.ASPECT_NAME, "add", - path="/immutable", + path=("immutable",), value=immutable, ) return self From 6386057b34ffb93d385cbf77994ef1fb8118d11c Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 30 Dec 2024 21:06:36 -0500 Subject: [PATCH 02/11] add PatchOp literal type --- .../src/datahub/emitter/mcp_patch_builder.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py b/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py index c6d6a2c95b6643..221b7bac0a5628 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py @@ -2,7 +2,7 @@ import time from collections import defaultdict from dataclasses import dataclass -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Dict, List, Literal, Optional, Tuple, Union from typing_extensions import LiteralString @@ -31,11 +31,12 @@ def _recursive_to_obj(obj: Any) -> Any: PatchPath = Tuple[Union[LiteralString, Urn], ...] +PatchOp = Literal["add", "remove", "replace"] @dataclass class _Patch: - op: str # one of ['add', 'remove', 'replace']; we don't support move, copy or test + op: PatchOp path: PatchPath value: Any @@ -73,7 +74,11 @@ def quote(cls, value: Union[str, Urn]) -> str: return str(value).replace("~", "~0").replace("/", "~1") def _add_patch( - self, aspect_name: str, op: str, path: PatchPath, value: Any + self, + aspect_name: str, + op: PatchOp, + path: PatchPath, + value: Any, ) -> None: # TODO: Validate that aspectName is a valid aspect for this entityType self.patches[aspect_name].append(_Patch(op, path, value)) From 11ae8db42943ce7e5b564e1fbff2b65174b4cf76 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Mon, 30 Dec 2024 21:11:08 -0500 Subject: [PATCH 03/11] add HasOwnershipPatch helper --- .../src/datahub/specific/chart.py | 52 +----------------- .../src/datahub/specific/dashboard.py | 52 +----------------- .../src/datahub/specific/datajob.py | 52 +----------------- .../src/datahub/specific/dataproduct.py | 24 +-------- .../src/datahub/specific/dataset.py | 24 +-------- .../src/datahub/specific/form.py | 24 +-------- .../src/datahub/specific/ownership.py | 53 +++++++++++++------ 7 files changed, 48 insertions(+), 233 deletions(-) diff --git a/metadata-ingestion/src/datahub/specific/chart.py b/metadata-ingestion/src/datahub/specific/chart.py index cc751c190e410d..a1d399ae4d6206 100644 --- a/metadata-ingestion/src/datahub/specific/chart.py +++ b/metadata-ingestion/src/datahub/specific/chart.py @@ -11,18 +11,16 @@ GlossaryTermAssociationClass as Term, GlossaryTermsClass as GlossaryTerms, KafkaAuditHeaderClass, - OwnerClass as Owner, - OwnershipTypeClass, SystemMetadataClass, TagAssociationClass as Tag, ) from datahub.specific.custom_properties import CustomPropertiesPatchHelper -from datahub.specific.ownership import OwnershipPatchHelper +from datahub.specific.ownership import HasOwnershipPatch from datahub.utilities.urns.tag_urn import TagUrn from datahub.utilities.urns.urn import Urn -class ChartPatchBuilder(MetadataPatchProposal): +class ChartPatchBuilder(HasOwnershipPatch, MetadataPatchProposal): def __init__( self, urn: str, @@ -43,52 +41,6 @@ def __init__( self.custom_properties_patch_helper = CustomPropertiesPatchHelper( self, ChartInfo.ASPECT_NAME ) - self.ownership_patch_helper = OwnershipPatchHelper(self) - - def add_owner(self, owner: Owner) -> "ChartPatchBuilder": - """ - Adds an owner to the ChartPatchBuilder. - - Args: - owner: The Owner object to add. - - Returns: - The ChartPatchBuilder instance. - """ - self.ownership_patch_helper.add_owner(owner) - return self - - def remove_owner( - self, owner: str, owner_type: Optional[OwnershipTypeClass] = None - ) -> "ChartPatchBuilder": - """ - Removes an owner from the ChartPatchBuilder. - - Args: - owner: The owner to remove. - owner_type: The ownership type of the owner (optional). - - Returns: - The ChartPatchBuilder instance. - - Notes: - `owner_type` is optional. - """ - self.ownership_patch_helper.remove_owner(owner, owner_type) - return self - - def set_owners(self, owners: List[Owner]) -> "ChartPatchBuilder": - """ - Sets the owners of the ChartPatchBuilder. - - Args: - owners: A list of Owner objects. - - Returns: - The ChartPatchBuilder instance. - """ - self.ownership_patch_helper.set_owners(owners) - return self def add_input_edge(self, input: Union[Edge, Urn, str]) -> "ChartPatchBuilder": """ diff --git a/metadata-ingestion/src/datahub/specific/dashboard.py b/metadata-ingestion/src/datahub/specific/dashboard.py index 8c36e56406a6ea..c07e756482fbfa 100644 --- a/metadata-ingestion/src/datahub/specific/dashboard.py +++ b/metadata-ingestion/src/datahub/specific/dashboard.py @@ -10,18 +10,16 @@ GlossaryTermAssociationClass as Term, GlossaryTermsClass as GlossaryTerms, KafkaAuditHeaderClass, - OwnerClass as Owner, - OwnershipTypeClass, SystemMetadataClass, TagAssociationClass as Tag, ) from datahub.specific.custom_properties import CustomPropertiesPatchHelper -from datahub.specific.ownership import OwnershipPatchHelper +from datahub.specific.ownership import HasOwnershipPatch from datahub.utilities.urns.tag_urn import TagUrn from datahub.utilities.urns.urn import Urn -class DashboardPatchBuilder(MetadataPatchProposal): +class DashboardPatchBuilder(HasOwnershipPatch, MetadataPatchProposal): def __init__( self, urn: str, @@ -42,52 +40,6 @@ def __init__( self.custom_properties_patch_helper = CustomPropertiesPatchHelper( self, DashboardInfo.ASPECT_NAME ) - self.ownership_patch_helper = OwnershipPatchHelper(self) - - def add_owner(self, owner: Owner) -> "DashboardPatchBuilder": - """ - Adds an owner to the DashboardPatchBuilder. - - Args: - owner: The Owner object to add. - - Returns: - The DashboardPatchBuilder instance. - """ - self.ownership_patch_helper.add_owner(owner) - return self - - def remove_owner( - self, owner: str, owner_type: Optional[OwnershipTypeClass] = None - ) -> "DashboardPatchBuilder": - """ - Removes an owner from the DashboardPatchBuilder. - - Args: - owner: The owner to remove. - owner_type: The ownership type of the owner (optional). - - Returns: - The DashboardPatchBuilder instance. - - Notes: - `owner_type` is optional. - """ - self.ownership_patch_helper.remove_owner(owner, owner_type) - return self - - def set_owners(self, owners: List[Owner]) -> "DashboardPatchBuilder": - """ - Sets the owners of the DashboardPatchBuilder. - - Args: - owners: A list of Owner objects. - - Returns: - The DashboardPatchBuilder instance. - """ - self.ownership_patch_helper.set_owners(owners) - return self def add_dataset_edge( self, dataset: Union[Edge, Urn, str] diff --git a/metadata-ingestion/src/datahub/specific/datajob.py b/metadata-ingestion/src/datahub/specific/datajob.py index 517aad684b656e..ff04957e134c99 100644 --- a/metadata-ingestion/src/datahub/specific/datajob.py +++ b/metadata-ingestion/src/datahub/specific/datajob.py @@ -9,17 +9,15 @@ GlossaryTermAssociationClass as Term, GlossaryTermsClass as GlossaryTerms, KafkaAuditHeaderClass, - OwnerClass as Owner, - OwnershipTypeClass, SystemMetadataClass, TagAssociationClass as Tag, ) from datahub.metadata.urns import SchemaFieldUrn, TagUrn, Urn from datahub.specific.custom_properties import CustomPropertiesPatchHelper -from datahub.specific.ownership import OwnershipPatchHelper +from datahub.specific.ownership import HasOwnershipPatch -class DataJobPatchBuilder(MetadataPatchProposal): +class DataJobPatchBuilder(HasOwnershipPatch, MetadataPatchProposal): def __init__( self, urn: str, @@ -40,52 +38,6 @@ def __init__( self.custom_properties_patch_helper = CustomPropertiesPatchHelper( self, DataJobInfo.ASPECT_NAME ) - self.ownership_patch_helper = OwnershipPatchHelper(self) - - def add_owner(self, owner: Owner) -> "DataJobPatchBuilder": - """ - Adds an owner to the DataJobPatchBuilder. - - Args: - owner: The Owner object to add. - - Returns: - The DataJobPatchBuilder instance. - """ - self.ownership_patch_helper.add_owner(owner) - return self - - def remove_owner( - self, owner: str, owner_type: Optional[OwnershipTypeClass] = None - ) -> "DataJobPatchBuilder": - """ - Removes an owner from the DataJobPatchBuilder. - - Args: - owner: The owner to remove. - owner_type: The ownership type of the owner (optional). - - Returns: - The DataJobPatchBuilder instance. - - Notes: - `owner_type` is optional. - """ - self.ownership_patch_helper.remove_owner(owner, owner_type) - return self - - def set_owners(self, owners: List[Owner]) -> "DataJobPatchBuilder": - """ - Sets the owners of the DataJobPatchBuilder. - - Args: - owners: A list of Owner objects. - - Returns: - The DataJobPatchBuilder instance. - """ - self.ownership_patch_helper.set_owners(owners) - return self def add_input_datajob(self, input: Union[Edge, Urn, str]) -> "DataJobPatchBuilder": """ diff --git a/metadata-ingestion/src/datahub/specific/dataproduct.py b/metadata-ingestion/src/datahub/specific/dataproduct.py index 880fb423a3184e..0c49fd80d9922b 100644 --- a/metadata-ingestion/src/datahub/specific/dataproduct.py +++ b/metadata-ingestion/src/datahub/specific/dataproduct.py @@ -8,18 +8,16 @@ GlossaryTermAssociationClass as Term, GlossaryTermsClass as GlossaryTerms, KafkaAuditHeaderClass, - OwnerClass as Owner, - OwnershipTypeClass, SystemMetadataClass, TagAssociationClass as Tag, ) from datahub.specific.custom_properties import CustomPropertiesPatchHelper -from datahub.specific.ownership import OwnershipPatchHelper +from datahub.specific.ownership import HasOwnershipPatch from datahub.utilities.urns.tag_urn import TagUrn from datahub.utilities.urns.urn import Urn -class DataProductPatchBuilder(MetadataPatchProposal): +class DataProductPatchBuilder(HasOwnershipPatch, MetadataPatchProposal): def __init__( self, urn: str, @@ -34,24 +32,6 @@ def __init__( self.custom_properties_patch_helper = CustomPropertiesPatchHelper( self, DataProductProperties.ASPECT_NAME ) - self.ownership_patch_helper = OwnershipPatchHelper(self) - - def add_owner(self, owner: Owner) -> "DataProductPatchBuilder": - self.ownership_patch_helper.add_owner(owner) - return self - - def remove_owner( - self, owner: str, owner_type: Optional[OwnershipTypeClass] = None - ) -> "DataProductPatchBuilder": - """ - param: owner_type is optional - """ - self.ownership_patch_helper.remove_owner(owner, owner_type) - return self - - def set_owners(self, owners: List[Owner]) -> "DataProductPatchBuilder": - self.ownership_patch_helper.set_owners(owners) - return self def add_tag(self, tag: Tag) -> "DataProductPatchBuilder": self._add_patch( diff --git a/metadata-ingestion/src/datahub/specific/dataset.py b/metadata-ingestion/src/datahub/specific/dataset.py index d5a1126065ad5e..b4833ce049f2cc 100644 --- a/metadata-ingestion/src/datahub/specific/dataset.py +++ b/metadata-ingestion/src/datahub/specific/dataset.py @@ -11,8 +11,6 @@ GlossaryTermAssociationClass as Term, GlossaryTermsClass as GlossaryTerms, KafkaAuditHeaderClass, - OwnerClass as Owner, - OwnershipTypeClass, SchemaMetadataClass, SystemMetadataClass, TagAssociationClass as Tag, @@ -20,7 +18,7 @@ UpstreamLineageClass as UpstreamLineage, ) from datahub.specific.custom_properties import CustomPropertiesPatchHelper -from datahub.specific.ownership import OwnershipPatchHelper +from datahub.specific.ownership import HasOwnershipPatch from datahub.specific.structured_properties import StructuredPropertiesPatchHelper from datahub.utilities.urns.tag_urn import TagUrn from datahub.utilities.urns.urn import Urn @@ -94,7 +92,7 @@ def parent(self) -> _Parent: return self._parent -class DatasetPatchBuilder(MetadataPatchProposal): +class DatasetPatchBuilder(HasOwnershipPatch, MetadataPatchProposal): def __init__( self, urn: str, @@ -107,26 +105,8 @@ def __init__( self.custom_properties_patch_helper = CustomPropertiesPatchHelper( self, DatasetProperties.ASPECT_NAME ) - self.ownership_patch_helper = OwnershipPatchHelper(self) self.structured_properties_patch_helper = StructuredPropertiesPatchHelper(self) - def add_owner(self, owner: Owner) -> "DatasetPatchBuilder": - self.ownership_patch_helper.add_owner(owner) - return self - - def remove_owner( - self, owner: str, owner_type: Optional[OwnershipTypeClass] = None - ) -> "DatasetPatchBuilder": - """ - param: owner_type is optional - """ - self.ownership_patch_helper.remove_owner(owner, owner_type) - return self - - def set_owners(self, owners: List[Owner]) -> "DatasetPatchBuilder": - self.ownership_patch_helper.set_owners(owners) - return self - def add_upstream_lineage(self, upstream: Upstream) -> "DatasetPatchBuilder": self._add_patch( UpstreamLineage.ASPECT_NAME, diff --git a/metadata-ingestion/src/datahub/specific/form.py b/metadata-ingestion/src/datahub/specific/form.py index 04fdc77e8e65c3..e8574dc90932a5 100644 --- a/metadata-ingestion/src/datahub/specific/form.py +++ b/metadata-ingestion/src/datahub/specific/form.py @@ -5,15 +5,13 @@ FormInfoClass as FormInfo, FormPromptClass, KafkaAuditHeaderClass, - OwnerClass as Owner, - OwnershipTypeClass, SystemMetadataClass, ) -from datahub.specific.ownership import OwnershipPatchHelper +from datahub.specific.ownership import HasOwnershipPatch from datahub.utilities.urns.urn import Urn -class FormPatchBuilder(MetadataPatchProposal): +class FormPatchBuilder(HasOwnershipPatch, MetadataPatchProposal): def __init__( self, urn: str, @@ -23,24 +21,6 @@ def __init__( super().__init__( urn, system_metadata=system_metadata, audit_header=audit_header ) - self.ownership_patch_helper = OwnershipPatchHelper(self) - - def add_owner(self, owner: Owner) -> "FormPatchBuilder": - self.ownership_patch_helper.add_owner(owner) - return self - - def remove_owner( - self, owner: str, owner_type: Optional[OwnershipTypeClass] = None - ) -> "FormPatchBuilder": - """ - param: owner_type is optional - """ - self.ownership_patch_helper.remove_owner(owner, owner_type) - return self - - def set_owners(self, owners: List[Owner]) -> "FormPatchBuilder": - self.ownership_patch_helper.set_owners(owners) - return self def set_name(self, name: Optional[str] = None) -> "FormPatchBuilder": if name is not None: diff --git a/metadata-ingestion/src/datahub/specific/ownership.py b/metadata-ingestion/src/datahub/specific/ownership.py index abeb8ea061ba94..1e2c789c7def35 100644 --- a/metadata-ingestion/src/datahub/specific/ownership.py +++ b/metadata-ingestion/src/datahub/specific/ownership.py @@ -1,4 +1,6 @@ -from typing import Generic, List, Optional, TypeVar +from typing import List, Optional + +from typing_extensions import Self from datahub.emitter.mcp_patch_builder import MetadataPatchProposal from datahub.metadata.schema_classes import ( @@ -7,19 +9,18 @@ OwnershipTypeClass, ) -_Parent = TypeVar("_Parent", bound=MetadataPatchProposal) - -class OwnershipPatchHelper(Generic[_Parent]): - def __init__(self, parent: _Parent) -> None: - self._parent = parent - self.aspect_field = OwnershipClass.ASPECT_NAME +class HasOwnershipPatch(MetadataPatchProposal): + def add_owner(self, owner: OwnerClass) -> Self: + """Add an owner to the entity. - def parent(self) -> _Parent: - return self._parent + Args: + owner: The Owner object to add. - def add_owner(self, owner: OwnerClass) -> "OwnershipPatchHelper": - self._parent._add_patch( + Returns: + The patch builder instance. + """ + self._add_patch( OwnershipClass.ASPECT_NAME, "add", path=("owners", owner.owner, str(owner.type)), @@ -29,11 +30,19 @@ def add_owner(self, owner: OwnerClass) -> "OwnershipPatchHelper": def remove_owner( self, owner: str, owner_type: Optional[OwnershipTypeClass] = None - ) -> "OwnershipPatchHelper": - """ - param: owner_type is optional + ) -> Self: + """Remove an owner from the entity. + + If owner_type is not provided, the owner will be removed regardless of ownership type. + + Args: + owner: The owner to remove. + owner_type: The ownership type of the owner (optional). + + Returns: + The patch builder instance. """ - self._parent._add_patch( + self._add_patch( OwnershipClass.ASPECT_NAME, "remove", path=("owners", owner) + ((str(owner_type),) if owner_type else ()), @@ -41,8 +50,18 @@ def remove_owner( ) return self - def set_owners(self, owners: List[OwnerClass]) -> "OwnershipPatchHelper": - self._parent._add_patch( + def set_owners(self, owners: List[OwnerClass]) -> Self: + """Set the owners of the entity. + + This will effectively replace all existing owners with the new list - it doesn't really patch things. + + Args: + owners: The list of owners to set. + + Returns: + The patch builder instance. + """ + self._add_patch( OwnershipClass.ASPECT_NAME, "add", path=("owners",), value=owners ) return self From 5221c9fd2f31def8c58dedfcbfaaa6c97d153fa6 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 31 Dec 2024 12:24:48 -0500 Subject: [PATCH 04/11] add supportstoobj --- .../src/datahub/emitter/mcp_patch_builder.py | 22 ++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py b/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py index 221b7bac0a5628..17026a4114c128 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py @@ -2,7 +2,17 @@ import time from collections import defaultdict from dataclasses import dataclass -from typing import Any, Dict, List, Literal, Optional, Tuple, Union +from typing import ( + Any, + Dict, + List, + Literal, + Optional, + Protocol, + Tuple, + Union, + runtime_checkable, +) from typing_extensions import LiteralString @@ -21,10 +31,16 @@ from datahub.utilities.urns.urn import guess_entity_type +@runtime_checkable +class SupportsToObj(Protocol): + def to_obj(self) -> Any: + ... + + def _recursive_to_obj(obj: Any) -> Any: if isinstance(obj, list): return [_recursive_to_obj(v) for v in obj] - elif hasattr(obj, "to_obj"): + elif isinstance(obj, SupportsToObj): return obj.to_obj() else: return obj @@ -35,7 +51,7 @@ def _recursive_to_obj(obj: Any) -> Any: @dataclass -class _Patch: +class _Patch(SupportsToObj): op: PatchOp path: PatchPath value: Any From 060a50f905f8e6013d7a7fb58de8032eb0899a22 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 31 Dec 2024 13:03:43 -0500 Subject: [PATCH 05/11] use assert never --- metadata-ingestion/src/datahub/emitter/mce_builder.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/emitter/mce_builder.py b/metadata-ingestion/src/datahub/emitter/mce_builder.py index 110624aa61cb89..f095fffbaea6b4 100644 --- a/metadata-ingestion/src/datahub/emitter/mce_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mce_builder.py @@ -24,6 +24,7 @@ import typing_inspect from avrogen.dict_wrapper import DictWrapper +from typing_extensions import assert_never from datahub.emitter.enum_helpers import get_enum_options from datahub.metadata.schema_classes import ( @@ -269,9 +270,8 @@ def make_owner_urn(owner: str, owner_type: OwnerType) -> str: return make_user_urn(owner) elif owner_type == OwnerType.GROUP: return make_group_urn(owner) - # This should pretty much never happen. - # TODO: With Python 3.11, we can use typing.assert_never() here. - return f"urn:li:{owner_type.value}:{owner}" + else: + assert_never(owner_type) def make_ownership_type_urn(type: str) -> str: From 14b3fbee94d075a160b217a2832038c01dad0057 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 31 Dec 2024 13:19:31 -0500 Subject: [PATCH 06/11] start moving to mixin helpers --- .../specific/aspect_helpers/__init__.py | 0 .../aspect_helpers/custom_properties.py | 81 +++++++++++++++++++ .../{ => aspect_helpers}/ownership.py | 0 .../src/datahub/specific/chart.py | 69 +++------------- .../src/datahub/specific/custom_properties.py | 37 --------- .../src/datahub/specific/dashboard.py | 78 +++--------------- .../src/datahub/specific/datajob.py | 69 +++------------- .../src/datahub/specific/dataproduct.py | 38 +++------ .../src/datahub/specific/dataset.py | 44 +++------- .../src/datahub/specific/form.py | 2 +- 10 files changed, 136 insertions(+), 282 deletions(-) create mode 100644 metadata-ingestion/src/datahub/specific/aspect_helpers/__init__.py create mode 100644 metadata-ingestion/src/datahub/specific/aspect_helpers/custom_properties.py rename metadata-ingestion/src/datahub/specific/{ => aspect_helpers}/ownership.py (100%) delete mode 100644 metadata-ingestion/src/datahub/specific/custom_properties.py diff --git a/metadata-ingestion/src/datahub/specific/aspect_helpers/__init__.py b/metadata-ingestion/src/datahub/specific/aspect_helpers/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/metadata-ingestion/src/datahub/specific/aspect_helpers/custom_properties.py b/metadata-ingestion/src/datahub/specific/aspect_helpers/custom_properties.py new file mode 100644 index 00000000000000..dc8644b810312c --- /dev/null +++ b/metadata-ingestion/src/datahub/specific/aspect_helpers/custom_properties.py @@ -0,0 +1,81 @@ +from abc import abstractmethod +from typing import Dict, Optional, Tuple, TypeVar + +from typing_extensions import Self + +from datahub.emitter.mcp_patch_builder import MetadataPatchProposal, PatchPath + +_Parent = TypeVar("_Parent", bound=MetadataPatchProposal) + + +class HasCustomPropertiesPatch(MetadataPatchProposal): + @classmethod + @abstractmethod + def _custom_properties_location(self) -> Tuple[str, PatchPath]: + ... + + def add_custom_property(self, key: str, value: str) -> Self: + """Add a custom property to the entity. + + Args: + key: The key of the custom property. + value: The value of the custom property. + + Returns: + The patch builder instance. + """ + aspect_name, path = self._custom_properties_location() + self._add_patch( + aspect_name, + "add", + path=(*path, key), + value=value, + ) + return self + + def add_custom_properties( + self, custom_properties: Optional[Dict[str, str]] = None + ) -> Self: + if custom_properties is not None: + for key, value in custom_properties.items(): + self.add_custom_property(key, value) + return self + + def remove_custom_property(self, key: str) -> Self: + """Remove a custom property from the entity. + + Args: + key: The key of the custom property to remove. + + Returns: + The patch builder instance. + """ + aspect_name, path = self._custom_properties_location() + self._add_patch( + aspect_name, + "remove", + path=(*path, key), + value={}, + ) + return self + + def set_custom_properties(self, custom_properties: Dict[str, str]) -> Self: + """Sets the custom properties of the entity. + + This method replaces all existing custom properties with the given dictionary. + + Args: + custom_properties: A dictionary containing the custom properties to be set. + + Returns: + The patch builder instance. + """ + + aspect_name, path = self._custom_properties_location() + self._add_patch( + aspect_name, + "add", + path=path, + value=custom_properties, + ) + return self diff --git a/metadata-ingestion/src/datahub/specific/ownership.py b/metadata-ingestion/src/datahub/specific/aspect_helpers/ownership.py similarity index 100% rename from metadata-ingestion/src/datahub/specific/ownership.py rename to metadata-ingestion/src/datahub/specific/aspect_helpers/ownership.py diff --git a/metadata-ingestion/src/datahub/specific/chart.py b/metadata-ingestion/src/datahub/specific/chart.py index a1d399ae4d6206..4f798fadafd40d 100644 --- a/metadata-ingestion/src/datahub/specific/chart.py +++ b/metadata-ingestion/src/datahub/specific/chart.py @@ -1,6 +1,6 @@ -from typing import Dict, List, Optional, Union +from typing import List, Optional, Tuple, Union -from datahub.emitter.mcp_patch_builder import MetadataPatchProposal +from datahub.emitter.mcp_patch_builder import MetadataPatchProposal, PatchPath from datahub.metadata.schema_classes import ( AccessLevelClass, ChangeAuditStampsClass, @@ -14,13 +14,15 @@ SystemMetadataClass, TagAssociationClass as Tag, ) -from datahub.specific.custom_properties import CustomPropertiesPatchHelper -from datahub.specific.ownership import HasOwnershipPatch +from datahub.specific.aspect_helpers.custom_properties import HasCustomPropertiesPatch +from datahub.specific.aspect_helpers.ownership import HasOwnershipPatch from datahub.utilities.urns.tag_urn import TagUrn from datahub.utilities.urns.urn import Urn -class ChartPatchBuilder(HasOwnershipPatch, MetadataPatchProposal): +class ChartPatchBuilder( + HasOwnershipPatch, HasCustomPropertiesPatch, MetadataPatchProposal +): def __init__( self, urn: str, @@ -38,9 +40,10 @@ def __init__( super().__init__( urn, system_metadata=system_metadata, audit_header=audit_header ) - self.custom_properties_patch_helper = CustomPropertiesPatchHelper( - self, ChartInfo.ASPECT_NAME - ) + + @classmethod + def _custom_properties_location(cls) -> Tuple[str, PatchPath]: + return ChartInfo.ASPECT_NAME, ("customProperties",) def add_input_edge(self, input: Union[Edge, Urn, str]) -> "ChartPatchBuilder": """ @@ -178,56 +181,6 @@ def remove_term(self, term: Union[str, Urn]) -> "ChartPatchBuilder": ) return self - def set_custom_properties( - self, custom_properties: Dict[str, str] - ) -> "ChartPatchBuilder": - """ - Sets the custom properties for the ChartPatchBuilder. - - Args: - custom_properties: A dictionary containing the custom properties to be set. - - Returns: - The ChartPatchBuilder instance. - - Notes: - This method replaces all existing custom properties with the given dictionary. - """ - self._add_patch( - ChartInfo.ASPECT_NAME, - "add", - path=("customProperties",), - value=custom_properties, - ) - return self - - def add_custom_property(self, key: str, value: str) -> "ChartPatchBuilder": - """ - Adds a custom property to the ChartPatchBuilder. - - Args: - key: The key of the custom property. - value: The value of the custom property. - - Returns: - The ChartPatchBuilder instance. - """ - self.custom_properties_patch_helper.add_property(key, value) - return self - - def remove_custom_property(self, key: str) -> "ChartPatchBuilder": - """ - Removes a custom property from the ChartPatchBuilder. - - Args: - key: The key of the custom property to remove. - - Returns: - The ChartPatchBuilder instance. - """ - self.custom_properties_patch_helper.remove_property(key) - return self - def set_title(self, title: str) -> "ChartPatchBuilder": assert title, "ChartInfo title should not be None" self._add_patch( diff --git a/metadata-ingestion/src/datahub/specific/custom_properties.py b/metadata-ingestion/src/datahub/specific/custom_properties.py deleted file mode 100644 index c1d46b440b39f1..00000000000000 --- a/metadata-ingestion/src/datahub/specific/custom_properties.py +++ /dev/null @@ -1,37 +0,0 @@ -from typing import Generic, TypeVar - -from datahub.emitter.mcp_patch_builder import MetadataPatchProposal - -_Parent = TypeVar("_Parent", bound=MetadataPatchProposal) - - -class CustomPropertiesPatchHelper(Generic[_Parent]): - def __init__( - self, - parent: _Parent, - aspect_name: str, - ) -> None: - self.aspect_name = aspect_name - self._parent = parent - self.aspect_field = "customProperties" - - def parent(self) -> _Parent: - return self._parent - - def add_property(self, key: str, value: str) -> "CustomPropertiesPatchHelper": - self._parent._add_patch( - self.aspect_name, - "add", - path=(self.aspect_field, key), - value=value, - ) - return self - - def remove_property(self, key: str) -> "CustomPropertiesPatchHelper": - self._parent._add_patch( - self.aspect_name, - "remove", - path=(self.aspect_field, key), - value={}, - ) - return self diff --git a/metadata-ingestion/src/datahub/specific/dashboard.py b/metadata-ingestion/src/datahub/specific/dashboard.py index c07e756482fbfa..1f1201833d94be 100644 --- a/metadata-ingestion/src/datahub/specific/dashboard.py +++ b/metadata-ingestion/src/datahub/specific/dashboard.py @@ -1,6 +1,6 @@ -from typing import Dict, List, Optional, Union +from typing import List, Optional, Tuple, Union -from datahub.emitter.mcp_patch_builder import MetadataPatchProposal +from datahub.emitter.mcp_patch_builder import MetadataPatchProposal, PatchPath from datahub.metadata.schema_classes import ( AccessLevelClass, ChangeAuditStampsClass, @@ -13,13 +13,15 @@ SystemMetadataClass, TagAssociationClass as Tag, ) -from datahub.specific.custom_properties import CustomPropertiesPatchHelper -from datahub.specific.ownership import HasOwnershipPatch +from datahub.specific.aspect_helpers.custom_properties import HasCustomPropertiesPatch +from datahub.specific.aspect_helpers.ownership import HasOwnershipPatch from datahub.utilities.urns.tag_urn import TagUrn from datahub.utilities.urns.urn import Urn -class DashboardPatchBuilder(HasOwnershipPatch, MetadataPatchProposal): +class DashboardPatchBuilder( + HasOwnershipPatch, HasCustomPropertiesPatch, MetadataPatchProposal +): def __init__( self, urn: str, @@ -37,9 +39,10 @@ def __init__( super().__init__( urn, system_metadata=system_metadata, audit_header=audit_header ) - self.custom_properties_patch_helper = CustomPropertiesPatchHelper( - self, DashboardInfo.ASPECT_NAME - ) + + @classmethod + def _custom_properties_location(cls) -> Tuple[str, PatchPath]: + return DashboardInfo.ASPECT_NAME, ("customProperties",) def add_dataset_edge( self, dataset: Union[Edge, Urn, str] @@ -271,56 +274,6 @@ def remove_term(self, term: Union[str, Urn]) -> "DashboardPatchBuilder": ) return self - def set_custom_properties( - self, custom_properties: Dict[str, str] - ) -> "DashboardPatchBuilder": - """ - Sets the custom properties for the DashboardPatchBuilder. - - Args: - custom_properties: A dictionary containing the custom properties to be set. - - Returns: - The DashboardPatchBuilder instance. - - Notes: - This method replaces all existing custom properties with the given dictionary. - """ - self._add_patch( - DashboardInfo.ASPECT_NAME, - "add", - path=("customProperties",), - value=custom_properties, - ) - return self - - def add_custom_property(self, key: str, value: str) -> "DashboardPatchBuilder": - """ - Adds a custom property to the DashboardPatchBuilder. - - Args: - key: The key of the custom property. - value: The value of the custom property. - - Returns: - The DashboardPatchBuilder instance. - """ - self.custom_properties_patch_helper.add_property(key, value) - return self - - def remove_custom_property(self, key: str) -> "DashboardPatchBuilder": - """ - Removes a custom property from the DashboardPatchBuilder. - - Args: - key: The key of the custom property to remove. - - Returns: - The DashboardPatchBuilder instance. - """ - self.custom_properties_patch_helper.remove_property(key) - return self - def set_title(self, title: str) -> "DashboardPatchBuilder": assert title, "DashboardInfo title should not be None" self._add_patch( @@ -343,15 +296,6 @@ def set_description(self, description: str) -> "DashboardPatchBuilder": return self - def add_custom_properties( - self, custom_properties: Optional[Dict[str, str]] = None - ) -> "DashboardPatchBuilder": - if custom_properties: - for key, value in custom_properties.items(): - self.custom_properties_patch_helper.add_property(key, value) - - return self - def set_external_url(self, external_url: Optional[str]) -> "DashboardPatchBuilder": if external_url: self._add_patch( diff --git a/metadata-ingestion/src/datahub/specific/datajob.py b/metadata-ingestion/src/datahub/specific/datajob.py index ff04957e134c99..0b98dd5f6a3bbe 100644 --- a/metadata-ingestion/src/datahub/specific/datajob.py +++ b/metadata-ingestion/src/datahub/specific/datajob.py @@ -1,6 +1,6 @@ -from typing import Dict, List, Optional, Union +from typing import List, Optional, Tuple, Union -from datahub.emitter.mcp_patch_builder import MetadataPatchProposal +from datahub.emitter.mcp_patch_builder import MetadataPatchProposal, PatchPath from datahub.metadata.schema_classes import ( DataJobInfoClass as DataJobInfo, DataJobInputOutputClass as DataJobInputOutput, @@ -13,11 +13,13 @@ TagAssociationClass as Tag, ) from datahub.metadata.urns import SchemaFieldUrn, TagUrn, Urn -from datahub.specific.custom_properties import CustomPropertiesPatchHelper -from datahub.specific.ownership import HasOwnershipPatch +from datahub.specific.aspect_helpers.custom_properties import HasCustomPropertiesPatch +from datahub.specific.aspect_helpers.ownership import HasOwnershipPatch -class DataJobPatchBuilder(HasOwnershipPatch, MetadataPatchProposal): +class DataJobPatchBuilder( + HasOwnershipPatch, HasCustomPropertiesPatch, MetadataPatchProposal +): def __init__( self, urn: str, @@ -35,9 +37,10 @@ def __init__( super().__init__( urn, system_metadata=system_metadata, audit_header=audit_header ) - self.custom_properties_patch_helper = CustomPropertiesPatchHelper( - self, DataJobInfo.ASPECT_NAME - ) + + @classmethod + def _custom_properties_location(cls) -> Tuple[str, PatchPath]: + return DataJobInfo.ASPECT_NAME, ("customProperties",) def add_input_datajob(self, input: Union[Edge, Urn, str]) -> "DataJobPatchBuilder": """ @@ -487,53 +490,3 @@ def remove_term(self, term: Union[str, Urn]) -> "DataJobPatchBuilder": GlossaryTerms.ASPECT_NAME, "remove", path=("terms", term), value={} ) return self - - def set_custom_properties( - self, custom_properties: Dict[str, str] - ) -> "DataJobPatchBuilder": - """ - Sets the custom properties for the DataJobPatchBuilder. - - Args: - custom_properties: A dictionary containing the custom properties to be set. - - Returns: - The DataJobPatchBuilder instance. - - Notes: - This method replaces all existing custom properties with the given dictionary. - """ - self._add_patch( - DataJobInfo.ASPECT_NAME, - "add", - path=("customProperties",), - value=custom_properties, - ) - return self - - def add_custom_property(self, key: str, value: str) -> "DataJobPatchBuilder": - """ - Adds a custom property to the DataJobPatchBuilder. - - Args: - key: The key of the custom property. - value: The value of the custom property. - - Returns: - The DataJobPatchBuilder instance. - """ - self.custom_properties_patch_helper.add_property(key, value) - return self - - def remove_custom_property(self, key: str) -> "DataJobPatchBuilder": - """ - Removes a custom property from the DataJobPatchBuilder. - - Args: - key: The key of the custom property to remove. - - Returns: - The DataJobPatchBuilder instance. - """ - self.custom_properties_patch_helper.remove_property(key) - return self diff --git a/metadata-ingestion/src/datahub/specific/dataproduct.py b/metadata-ingestion/src/datahub/specific/dataproduct.py index 0c49fd80d9922b..6f5ae452851130 100644 --- a/metadata-ingestion/src/datahub/specific/dataproduct.py +++ b/metadata-ingestion/src/datahub/specific/dataproduct.py @@ -1,6 +1,6 @@ -from typing import Dict, List, Optional, Union +from typing import List, Optional, Tuple, Union -from datahub.emitter.mcp_patch_builder import MetadataPatchProposal +from datahub.emitter.mcp_patch_builder import MetadataPatchProposal, PatchPath from datahub.metadata.schema_classes import ( DataProductAssociationClass as DataProductAssociation, DataProductPropertiesClass as DataProductProperties, @@ -11,13 +11,15 @@ SystemMetadataClass, TagAssociationClass as Tag, ) -from datahub.specific.custom_properties import CustomPropertiesPatchHelper -from datahub.specific.ownership import HasOwnershipPatch +from datahub.specific.aspect_helpers.custom_properties import HasCustomPropertiesPatch +from datahub.specific.aspect_helpers.ownership import HasOwnershipPatch from datahub.utilities.urns.tag_urn import TagUrn from datahub.utilities.urns.urn import Urn -class DataProductPatchBuilder(HasOwnershipPatch, MetadataPatchProposal): +class DataProductPatchBuilder( + HasOwnershipPatch, HasCustomPropertiesPatch, MetadataPatchProposal +): def __init__( self, urn: str, @@ -29,9 +31,10 @@ def __init__( system_metadata=system_metadata, audit_header=audit_header, ) - self.custom_properties_patch_helper = CustomPropertiesPatchHelper( - self, DataProductProperties.ASPECT_NAME - ) + + @classmethod + def _custom_properties_location(cls) -> Tuple[str, PatchPath]: + return DataProductProperties.ASPECT_NAME, ("customProperties",) def add_tag(self, tag: Tag) -> "DataProductPatchBuilder": self._add_patch( @@ -77,25 +80,6 @@ def set_description(self, description: str) -> "DataProductPatchBuilder": ) return self - def set_custom_properties( - self, custom_properties: Dict[str, str] - ) -> "DataProductPatchBuilder": - self._add_patch( - DataProductProperties.ASPECT_NAME, - "add", - path=("customProperties",), - value=custom_properties, - ) - return self - - def add_custom_property(self, key: str, value: str) -> "DataProductPatchBuilder": - self.custom_properties_patch_helper.add_property(key, value) - return self - - def remove_custom_property(self, key: str) -> "DataProductPatchBuilder": - self.custom_properties_patch_helper.remove_property(key) - return self - def set_assets( self, assets: List[DataProductAssociation] ) -> "DataProductPatchBuilder": diff --git a/metadata-ingestion/src/datahub/specific/dataset.py b/metadata-ingestion/src/datahub/specific/dataset.py index b4833ce049f2cc..1f5c7c389e98f4 100644 --- a/metadata-ingestion/src/datahub/specific/dataset.py +++ b/metadata-ingestion/src/datahub/specific/dataset.py @@ -1,4 +1,4 @@ -from typing import Dict, Generic, List, Optional, Tuple, TypeVar, Union +from typing import Generic, List, Optional, Tuple, TypeVar, Union from datahub.emitter.mcp_patch_builder import MetadataPatchProposal, PatchPath from datahub.metadata.com.linkedin.pegasus2avro.common import TimeStamp @@ -17,8 +17,8 @@ UpstreamClass as Upstream, UpstreamLineageClass as UpstreamLineage, ) -from datahub.specific.custom_properties import CustomPropertiesPatchHelper -from datahub.specific.ownership import HasOwnershipPatch +from datahub.specific.aspect_helpers.custom_properties import HasCustomPropertiesPatch +from datahub.specific.aspect_helpers.ownership import HasOwnershipPatch from datahub.specific.structured_properties import StructuredPropertiesPatchHelper from datahub.utilities.urns.tag_urn import TagUrn from datahub.utilities.urns.urn import Urn @@ -92,7 +92,9 @@ def parent(self) -> _Parent: return self._parent -class DatasetPatchBuilder(HasOwnershipPatch, MetadataPatchProposal): +class DatasetPatchBuilder( + HasOwnershipPatch, HasCustomPropertiesPatch, MetadataPatchProposal +): def __init__( self, urn: str, @@ -102,11 +104,12 @@ def __init__( super().__init__( urn, system_metadata=system_metadata, audit_header=audit_header ) - self.custom_properties_patch_helper = CustomPropertiesPatchHelper( - self, DatasetProperties.ASPECT_NAME - ) self.structured_properties_patch_helper = StructuredPropertiesPatchHelper(self) + @classmethod + def _custom_properties_location(cls) -> Tuple[str, PatchPath]: + return DatasetProperties.ASPECT_NAME, ("customProperties",) + def add_upstream_lineage(self, upstream: Upstream) -> "DatasetPatchBuilder": self._add_patch( UpstreamLineage.ASPECT_NAME, @@ -263,33 +266,6 @@ def set_description( ) return self - def set_custom_properties( - self, custom_properties: Dict[str, str] - ) -> "DatasetPatchBuilder": - self._add_patch( - DatasetProperties.ASPECT_NAME, - "add", - path=("customProperties",), - value=custom_properties, - ) - return self - - def add_custom_property(self, key: str, value: str) -> "DatasetPatchBuilder": - self.custom_properties_patch_helper.add_property(key, value) - return self - - def add_custom_properties( - self, custom_properties: Optional[Dict[str, str]] = None - ) -> "DatasetPatchBuilder": - if custom_properties is not None: - for key, value in custom_properties.items(): - self.custom_properties_patch_helper.add_property(key, value) - return self - - def remove_custom_property(self, key: str) -> "DatasetPatchBuilder": - self.custom_properties_patch_helper.remove_property(key) - return self - def set_display_name( self, display_name: Optional[str] = None ) -> "DatasetPatchBuilder": diff --git a/metadata-ingestion/src/datahub/specific/form.py b/metadata-ingestion/src/datahub/specific/form.py index e8574dc90932a5..281b3cac99b2c1 100644 --- a/metadata-ingestion/src/datahub/specific/form.py +++ b/metadata-ingestion/src/datahub/specific/form.py @@ -7,7 +7,7 @@ KafkaAuditHeaderClass, SystemMetadataClass, ) -from datahub.specific.ownership import HasOwnershipPatch +from datahub.specific.aspect_helpers.ownership import HasOwnershipPatch from datahub.utilities.urns.urn import Urn From 7549c9988c9b1885c020a3a89c5e2e6dd74ba690 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 31 Dec 2024 14:33:01 -0500 Subject: [PATCH 07/11] structured properties patch refactor --- .../aspect_helpers/custom_properties.py | 4 +- .../aspect_helpers/structured_properties.py | 72 +++++++++++++++++++ .../src/datahub/specific/dataset.py | 40 ++--------- .../datahub/specific/structured_properties.py | 53 -------------- 4 files changed, 80 insertions(+), 89 deletions(-) create mode 100644 metadata-ingestion/src/datahub/specific/aspect_helpers/structured_properties.py delete mode 100644 metadata-ingestion/src/datahub/specific/structured_properties.py diff --git a/metadata-ingestion/src/datahub/specific/aspect_helpers/custom_properties.py b/metadata-ingestion/src/datahub/specific/aspect_helpers/custom_properties.py index dc8644b810312c..1fd1585a913581 100644 --- a/metadata-ingestion/src/datahub/specific/aspect_helpers/custom_properties.py +++ b/metadata-ingestion/src/datahub/specific/aspect_helpers/custom_properties.py @@ -1,12 +1,10 @@ from abc import abstractmethod -from typing import Dict, Optional, Tuple, TypeVar +from typing import Dict, Optional, Tuple from typing_extensions import Self from datahub.emitter.mcp_patch_builder import MetadataPatchProposal, PatchPath -_Parent = TypeVar("_Parent", bound=MetadataPatchProposal) - class HasCustomPropertiesPatch(MetadataPatchProposal): @classmethod diff --git a/metadata-ingestion/src/datahub/specific/aspect_helpers/structured_properties.py b/metadata-ingestion/src/datahub/specific/aspect_helpers/structured_properties.py new file mode 100644 index 00000000000000..48050bbad8e50d --- /dev/null +++ b/metadata-ingestion/src/datahub/specific/aspect_helpers/structured_properties.py @@ -0,0 +1,72 @@ +from typing import List, Union + +from typing_extensions import Self + +from datahub.emitter.mcp_patch_builder import MetadataPatchProposal +from datahub.metadata.schema_classes import ( + StructuredPropertiesClass, + StructuredPropertyValueAssignmentClass, +) +from datahub.utilities.urns.structured_properties_urn import ( + make_structured_property_urn, +) + + +class HasStructuredPropertiesPatch(MetadataPatchProposal): + def set_structured_property( + self, key: str, value: Union[str, float, List[Union[str, float]]] + ) -> Self: + """Add or update a structured property. + + Args: + key: the name of the property (either bare or urn form) + value: the value of the property (for multi-valued properties, this can be a list) + + Returns: + The patch builder instance. + """ + self.remove_structured_property(key) + self.add_structured_property(key, value) + return self + + def remove_structured_property(self, key: str) -> Self: + """Remove a structured property. + + Args: + key: the name of the property (either bare or urn form) + + Returns: + The patch builder instance. + """ + + self._add_patch( + StructuredPropertiesClass.ASPECT_NAME, + "remove", + path=("properties", make_structured_property_urn(key)), + value={}, + ) + return self + + def add_structured_property( + self, key: str, value: Union[str, float, List[Union[str, float]]] + ) -> Self: + """Add a structured property. + + Args: + key: the name of the property (either bare or urn form) + value: the value of the property (for multi-valued properties, this value will be appended to the list) + + Returns: + The patch builder instance. + """ + + self._add_patch( + StructuredPropertiesClass.ASPECT_NAME, + "add", + path=("properties", make_structured_property_urn(key)), + value=StructuredPropertyValueAssignmentClass( + propertyUrn=make_structured_property_urn(key), + values=value if isinstance(value, list) else [value], + ), + ) + return self diff --git a/metadata-ingestion/src/datahub/specific/dataset.py b/metadata-ingestion/src/datahub/specific/dataset.py index 1f5c7c389e98f4..d8937998affc0a 100644 --- a/metadata-ingestion/src/datahub/specific/dataset.py +++ b/metadata-ingestion/src/datahub/specific/dataset.py @@ -19,7 +19,9 @@ ) from datahub.specific.aspect_helpers.custom_properties import HasCustomPropertiesPatch from datahub.specific.aspect_helpers.ownership import HasOwnershipPatch -from datahub.specific.structured_properties import StructuredPropertiesPatchHelper +from datahub.specific.aspect_helpers.structured_properties import ( + HasStructuredPropertiesPatch, +) from datahub.utilities.urns.tag_urn import TagUrn from datahub.utilities.urns.urn import Urn @@ -93,7 +95,10 @@ def parent(self) -> _Parent: class DatasetPatchBuilder( - HasOwnershipPatch, HasCustomPropertiesPatch, MetadataPatchProposal + HasOwnershipPatch, + HasCustomPropertiesPatch, + HasStructuredPropertiesPatch, + MetadataPatchProposal, ): def __init__( self, @@ -104,7 +109,6 @@ def __init__( super().__init__( urn, system_metadata=system_metadata, audit_header=audit_header ) - self.structured_properties_patch_helper = StructuredPropertiesPatchHelper(self) @classmethod def _custom_properties_location(cls) -> Tuple[str, PatchPath]: @@ -313,33 +317,3 @@ def set_last_modified( value=timestamp, ) return self - - def set_structured_property( - self, property_name: str, value: Union[str, float, List[Union[str, float]]] - ) -> "DatasetPatchBuilder": - """ - This is a helper method to set a structured property. - @param property_name: the name of the property (either bare or urn form) - @param value: the value of the property (for multi-valued properties, this can be a list) - """ - self.structured_properties_patch_helper.set_property(property_name, value) - return self - - def add_structured_property( - self, property_name: str, value: Union[str, float] - ) -> "DatasetPatchBuilder": - """ - This is a helper method to add a structured property. - @param property_name: the name of the property (either bare or urn form) - @param value: the value of the property (for multi-valued properties, this value will be appended to the list) - """ - self.structured_properties_patch_helper.add_property(property_name, value) - return self - - def remove_structured_property(self, property_name: str) -> "DatasetPatchBuilder": - """ - This is a helper method to remove a structured property. - @param property_name: the name of the property (either bare or urn form) - """ - self.structured_properties_patch_helper.remove_property(property_name) - return self diff --git a/metadata-ingestion/src/datahub/specific/structured_properties.py b/metadata-ingestion/src/datahub/specific/structured_properties.py deleted file mode 100644 index 17d896249c4746..00000000000000 --- a/metadata-ingestion/src/datahub/specific/structured_properties.py +++ /dev/null @@ -1,53 +0,0 @@ -from typing import Generic, List, TypeVar, Union - -from datahub.emitter.mcp_patch_builder import MetadataPatchProposal -from datahub.metadata.schema_classes import StructuredPropertyValueAssignmentClass -from datahub.utilities.urns.structured_properties_urn import ( - make_structured_property_urn, -) - -_Parent = TypeVar("_Parent", bound=MetadataPatchProposal) - - -class StructuredPropertiesPatchHelper(Generic[_Parent]): - def __init__( - self, - parent: _Parent, - aspect_name: str = "structuredProperties", - ) -> None: - self.aspect_name = aspect_name - self._parent = parent - self.aspect_field = "properties" - - def parent(self) -> _Parent: - return self._parent - - def set_property( - self, key: str, value: Union[str, float, List[Union[str, float]]] - ) -> "StructuredPropertiesPatchHelper": - self.remove_property(key) - self.add_property(key, value) - return self - - def remove_property(self, key: str) -> "StructuredPropertiesPatchHelper": - self._parent._add_patch( - self.aspect_name, - "remove", - path=(self.aspect_field, make_structured_property_urn(key)), - value={}, - ) - return self - - def add_property( - self, key: str, value: Union[str, float, List[Union[str, float]]] - ) -> "StructuredPropertiesPatchHelper": - self._parent._add_patch( - self.aspect_name, - "add", - path=(self.aspect_field, make_structured_property_urn(key)), - value=StructuredPropertyValueAssignmentClass( - propertyUrn=make_structured_property_urn(key), - values=value if isinstance(value, list) else [value], - ), - ) - return self From 68573cb71a32a669f0a7b9e7897dbad8887cf6e8 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 31 Dec 2024 15:25:38 -0500 Subject: [PATCH 08/11] refactor tags interface --- .../datahub/specific/aspect_helpers/tags.py | 42 +++++++++++++++++++ .../src/datahub/specific/chart.py | 36 +--------------- .../src/datahub/specific/dashboard.py | 36 +--------------- .../src/datahub/specific/datajob.py | 37 ++-------------- .../src/datahub/specific/dataproduct.py | 18 +------- .../src/datahub/specific/dataset.py | 15 +------ 6 files changed, 53 insertions(+), 131 deletions(-) create mode 100644 metadata-ingestion/src/datahub/specific/aspect_helpers/tags.py diff --git a/metadata-ingestion/src/datahub/specific/aspect_helpers/tags.py b/metadata-ingestion/src/datahub/specific/aspect_helpers/tags.py new file mode 100644 index 00000000000000..afbc9115ca6e2b --- /dev/null +++ b/metadata-ingestion/src/datahub/specific/aspect_helpers/tags.py @@ -0,0 +1,42 @@ +from typing import Union + +from typing_extensions import Self + +from datahub.emitter.mcp_patch_builder import MetadataPatchProposal +from datahub.metadata.schema_classes import ( + GlobalTagsClass as GlobalTags, + TagAssociationClass as Tag, +) +from datahub.metadata.urns import TagUrn, Urn + + +class HasTagsPatch(MetadataPatchProposal): + def add_tag(self, tag: Tag) -> Self: + """Adds a tag to the entity. + + Args: + tag: The Tag object representing the tag to be added. + + Returns: + The patch builder instance. + """ + + # TODO: Make this support raw strings, in addition to Tag objects. + self._add_patch( + GlobalTags.ASPECT_NAME, "add", path=("tags", tag.tag), value=tag + ) + return self + + def remove_tag(self, tag: Union[str, Urn]) -> Self: + """Removes a tag from the entity. + + Args: + tag: The tag to remove, specified as a string or Urn object. + + Returns: + The patch builder instance. + """ + if isinstance(tag, str) and not tag.startswith("urn:li:tag:"): + tag = TagUrn.create_from_id(tag) + self._add_patch(GlobalTags.ASPECT_NAME, "remove", path=("tags", tag), value={}) + return self diff --git a/metadata-ingestion/src/datahub/specific/chart.py b/metadata-ingestion/src/datahub/specific/chart.py index 4f798fadafd40d..287cc0cbce739c 100644 --- a/metadata-ingestion/src/datahub/specific/chart.py +++ b/metadata-ingestion/src/datahub/specific/chart.py @@ -7,21 +7,19 @@ ChartInfoClass as ChartInfo, ChartTypeClass, EdgeClass as Edge, - GlobalTagsClass as GlobalTags, GlossaryTermAssociationClass as Term, GlossaryTermsClass as GlossaryTerms, KafkaAuditHeaderClass, SystemMetadataClass, - TagAssociationClass as Tag, ) from datahub.specific.aspect_helpers.custom_properties import HasCustomPropertiesPatch from datahub.specific.aspect_helpers.ownership import HasOwnershipPatch -from datahub.utilities.urns.tag_urn import TagUrn +from datahub.specific.aspect_helpers.tags import HasTagsPatch from datahub.utilities.urns.urn import Urn class ChartPatchBuilder( - HasOwnershipPatch, HasCustomPropertiesPatch, MetadataPatchProposal + HasOwnershipPatch, HasCustomPropertiesPatch, HasTagsPatch, MetadataPatchProposal ): def __init__( self, @@ -119,36 +117,6 @@ def set_input_edges(self, inputs: List[Edge]) -> "ChartPatchBuilder": ) return self - def add_tag(self, tag: Tag) -> "ChartPatchBuilder": - """ - Adds a tag to the ChartPatchBuilder. - - Args: - tag: The Tag object representing the tag to be added. - - Returns: - The ChartPatchBuilder instance. - """ - self._add_patch( - GlobalTags.ASPECT_NAME, "add", path=("tags", tag.tag), value=tag - ) - return self - - def remove_tag(self, tag: Union[str, Urn]) -> "ChartPatchBuilder": - """ - Removes a tag from the ChartPatchBuilder. - - Args: - tag: The tag to remove, specified as a string or Urn object. - - Returns: - The ChartPatchBuilder instance. - """ - if isinstance(tag, str) and not tag.startswith("urn:li:tag:"): - tag = TagUrn.create_from_id(tag) - self._add_patch(GlobalTags.ASPECT_NAME, "remove", path=("tags", tag), value={}) - return self - def add_term(self, term: Term) -> "ChartPatchBuilder": """ Adds a glossary term to the ChartPatchBuilder. diff --git a/metadata-ingestion/src/datahub/specific/dashboard.py b/metadata-ingestion/src/datahub/specific/dashboard.py index 1f1201833d94be..f6a4051a9579a8 100644 --- a/metadata-ingestion/src/datahub/specific/dashboard.py +++ b/metadata-ingestion/src/datahub/specific/dashboard.py @@ -6,21 +6,19 @@ ChangeAuditStampsClass, DashboardInfoClass as DashboardInfo, EdgeClass as Edge, - GlobalTagsClass as GlobalTags, GlossaryTermAssociationClass as Term, GlossaryTermsClass as GlossaryTerms, KafkaAuditHeaderClass, SystemMetadataClass, - TagAssociationClass as Tag, ) from datahub.specific.aspect_helpers.custom_properties import HasCustomPropertiesPatch from datahub.specific.aspect_helpers.ownership import HasOwnershipPatch -from datahub.utilities.urns.tag_urn import TagUrn +from datahub.specific.aspect_helpers.tags import HasTagsPatch from datahub.utilities.urns.urn import Urn class DashboardPatchBuilder( - HasOwnershipPatch, HasCustomPropertiesPatch, MetadataPatchProposal + HasOwnershipPatch, HasCustomPropertiesPatch, HasTagsPatch, MetadataPatchProposal ): def __init__( self, @@ -212,36 +210,6 @@ def set_chart_edges(self, charts: List[Edge]) -> "DashboardPatchBuilder": ) return self - def add_tag(self, tag: Tag) -> "DashboardPatchBuilder": - """ - Adds a tag to the DashboardPatchBuilder. - - Args: - tag: The Tag object representing the tag to be added. - - Returns: - The DashboardPatchBuilder instance. - """ - self._add_patch( - GlobalTags.ASPECT_NAME, "add", path=("tags", tag.tag), value=tag - ) - return self - - def remove_tag(self, tag: Union[str, Urn]) -> "DashboardPatchBuilder": - """ - Removes a tag from the DashboardPatchBuilder. - - Args: - tag: The tag to remove, specified as a string or Urn object. - - Returns: - The DashboardPatchBuilder instance. - """ - if isinstance(tag, str) and not tag.startswith("urn:li:tag:"): - tag = TagUrn.create_from_id(tag) - self._add_patch(GlobalTags.ASPECT_NAME, "remove", path=("tags", tag), value={}) - return self - def add_term(self, term: Term) -> "DashboardPatchBuilder": """ Adds a glossary term to the DashboardPatchBuilder. diff --git a/metadata-ingestion/src/datahub/specific/datajob.py b/metadata-ingestion/src/datahub/specific/datajob.py index 0b98dd5f6a3bbe..bd3baeb1e79fa3 100644 --- a/metadata-ingestion/src/datahub/specific/datajob.py +++ b/metadata-ingestion/src/datahub/specific/datajob.py @@ -5,20 +5,19 @@ DataJobInfoClass as DataJobInfo, DataJobInputOutputClass as DataJobInputOutput, EdgeClass as Edge, - GlobalTagsClass as GlobalTags, GlossaryTermAssociationClass as Term, GlossaryTermsClass as GlossaryTerms, KafkaAuditHeaderClass, SystemMetadataClass, - TagAssociationClass as Tag, ) -from datahub.metadata.urns import SchemaFieldUrn, TagUrn, Urn +from datahub.metadata.urns import SchemaFieldUrn, Urn from datahub.specific.aspect_helpers.custom_properties import HasCustomPropertiesPatch from datahub.specific.aspect_helpers.ownership import HasOwnershipPatch +from datahub.specific.aspect_helpers.tags import HasTagsPatch class DataJobPatchBuilder( - HasOwnershipPatch, HasCustomPropertiesPatch, MetadataPatchProposal + HasOwnershipPatch, HasCustomPropertiesPatch, HasTagsPatch, MetadataPatchProposal ): def __init__( self, @@ -429,36 +428,6 @@ def set_output_dataset_fields(self, outputs: List[Edge]) -> "DataJobPatchBuilder ) return self - def add_tag(self, tag: Tag) -> "DataJobPatchBuilder": - """ - Adds a tag to the DataJobPatchBuilder. - - Args: - tag: The Tag object representing the tag to be added. - - Returns: - The DataJobPatchBuilder instance. - """ - self._add_patch( - GlobalTags.ASPECT_NAME, "add", path=("tags", tag.tag), value=tag - ) - return self - - def remove_tag(self, tag: Union[str, Urn]) -> "DataJobPatchBuilder": - """ - Removes a tag from the DataJobPatchBuilder. - - Args: - tag: The tag to remove, specified as a string or Urn object. - - Returns: - The DataJobPatchBuilder instance. - """ - if isinstance(tag, str) and not tag.startswith("urn:li:tag:"): - tag = TagUrn.create_from_id(tag) - self._add_patch(GlobalTags.ASPECT_NAME, "remove", path=("tags", tag), value={}) - return self - def add_term(self, term: Term) -> "DataJobPatchBuilder": """ Adds a glossary term to the DataJobPatchBuilder. diff --git a/metadata-ingestion/src/datahub/specific/dataproduct.py b/metadata-ingestion/src/datahub/specific/dataproduct.py index 6f5ae452851130..9734214a0b5187 100644 --- a/metadata-ingestion/src/datahub/specific/dataproduct.py +++ b/metadata-ingestion/src/datahub/specific/dataproduct.py @@ -4,21 +4,19 @@ from datahub.metadata.schema_classes import ( DataProductAssociationClass as DataProductAssociation, DataProductPropertiesClass as DataProductProperties, - GlobalTagsClass as GlobalTags, GlossaryTermAssociationClass as Term, GlossaryTermsClass as GlossaryTerms, KafkaAuditHeaderClass, SystemMetadataClass, - TagAssociationClass as Tag, ) from datahub.specific.aspect_helpers.custom_properties import HasCustomPropertiesPatch from datahub.specific.aspect_helpers.ownership import HasOwnershipPatch -from datahub.utilities.urns.tag_urn import TagUrn +from datahub.specific.aspect_helpers.tags import HasTagsPatch from datahub.utilities.urns.urn import Urn class DataProductPatchBuilder( - HasOwnershipPatch, HasCustomPropertiesPatch, MetadataPatchProposal + HasOwnershipPatch, HasCustomPropertiesPatch, HasTagsPatch, MetadataPatchProposal ): def __init__( self, @@ -36,18 +34,6 @@ def __init__( def _custom_properties_location(cls) -> Tuple[str, PatchPath]: return DataProductProperties.ASPECT_NAME, ("customProperties",) - def add_tag(self, tag: Tag) -> "DataProductPatchBuilder": - self._add_patch( - GlobalTags.ASPECT_NAME, "add", path=("tags", tag.tag), value=tag - ) - return self - - def remove_tag(self, tag: Union[str, Urn]) -> "DataProductPatchBuilder": - if isinstance(tag, str) and not tag.startswith("urn:li:tag:"): - tag = TagUrn.create_from_id(tag) - self._add_patch(GlobalTags.ASPECT_NAME, "remove", path=("tags", tag), value={}) - return self - def add_term(self, term: Term) -> "DataProductPatchBuilder": self._add_patch( GlossaryTerms.ASPECT_NAME, "add", path=("terms", term.urn), value=term diff --git a/metadata-ingestion/src/datahub/specific/dataset.py b/metadata-ingestion/src/datahub/specific/dataset.py index d8937998affc0a..4d52d99e822e78 100644 --- a/metadata-ingestion/src/datahub/specific/dataset.py +++ b/metadata-ingestion/src/datahub/specific/dataset.py @@ -7,7 +7,6 @@ EditableDatasetPropertiesClass as EditableDatasetProperties, EditableSchemaMetadataClass as EditableSchemaMetadata, FineGrainedLineageClass as FineGrainedLineage, - GlobalTagsClass as GlobalTags, GlossaryTermAssociationClass as Term, GlossaryTermsClass as GlossaryTerms, KafkaAuditHeaderClass, @@ -22,6 +21,7 @@ from datahub.specific.aspect_helpers.structured_properties import ( HasStructuredPropertiesPatch, ) +from datahub.specific.aspect_helpers.tags import HasTagsPatch from datahub.utilities.urns.tag_urn import TagUrn from datahub.utilities.urns.urn import Urn @@ -98,6 +98,7 @@ class DatasetPatchBuilder( HasOwnershipPatch, HasCustomPropertiesPatch, HasStructuredPropertiesPatch, + HasTagsPatch, MetadataPatchProposal, ): def __init__( @@ -213,18 +214,6 @@ def set_fine_grained_upstream_lineages( ) return self - def add_tag(self, tag: Tag) -> "DatasetPatchBuilder": - self._add_patch( - GlobalTags.ASPECT_NAME, "add", path=("tags", tag.tag), value=tag - ) - return self - - def remove_tag(self, tag: Union[str, Urn]) -> "DatasetPatchBuilder": - if isinstance(tag, str) and not tag.startswith("urn:li:tag:"): - tag = TagUrn.create_from_id(tag) - self._add_patch(GlobalTags.ASPECT_NAME, "remove", path=("tags", tag), value={}) - return self - def add_term(self, term: Term) -> "DatasetPatchBuilder": self._add_patch( GlossaryTerms.ASPECT_NAME, "add", path=("terms", term.urn), value=term From f7dfa644e1aca1090267c63c6c0519ea0e74031f Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Tue, 31 Dec 2024 17:04:40 -0500 Subject: [PATCH 09/11] refactor terms patch --- .../datahub/specific/aspect_helpers/terms.py | 43 +++++++++++++++++++ .../src/datahub/specific/chart.py | 41 +++--------------- .../src/datahub/specific/dashboard.py | 41 +++--------------- .../src/datahub/specific/datajob.py | 41 +++--------------- .../src/datahub/specific/dataproduct.py | 26 +++-------- .../src/datahub/specific/dataset.py | 17 +------- 6 files changed, 70 insertions(+), 139 deletions(-) create mode 100644 metadata-ingestion/src/datahub/specific/aspect_helpers/terms.py diff --git a/metadata-ingestion/src/datahub/specific/aspect_helpers/terms.py b/metadata-ingestion/src/datahub/specific/aspect_helpers/terms.py new file mode 100644 index 00000000000000..ae199124372b40 --- /dev/null +++ b/metadata-ingestion/src/datahub/specific/aspect_helpers/terms.py @@ -0,0 +1,43 @@ +from typing import Union + +from typing_extensions import Self + +from datahub.emitter.mcp_patch_builder import MetadataPatchProposal +from datahub.metadata.schema_classes import ( + GlossaryTermAssociationClass as Term, + GlossaryTermsClass, +) +from datahub.metadata.urns import GlossaryTermUrn, Urn + + +class HasTermsPatch(MetadataPatchProposal): + def add_term(self, term: Term) -> Self: + """Adds a glossary term to the entity. + + Args: + term: The Term object representing the glossary term to be added. + + Returns: + The patch builder instance. + """ + # TODO: Make this support raw strings, in addition to Term objects. + self._add_patch( + GlossaryTermsClass.ASPECT_NAME, "add", path=("terms", term.urn), value=term + ) + return self + + def remove_term(self, term: Union[str, Urn]) -> Self: + """Removes a glossary term from the entity. + + Args: + term: The term to remove, specified as a string or Urn object. + + Returns: + The patch builder instance. + """ + if isinstance(term, str) and not term.startswith("urn:li:glossaryTerm:"): + term = GlossaryTermUrn(term) + self._add_patch( + GlossaryTermsClass.ASPECT_NAME, "remove", path=("terms", term), value={} + ) + return self diff --git a/metadata-ingestion/src/datahub/specific/chart.py b/metadata-ingestion/src/datahub/specific/chart.py index 287cc0cbce739c..f44a2ffc0d68ab 100644 --- a/metadata-ingestion/src/datahub/specific/chart.py +++ b/metadata-ingestion/src/datahub/specific/chart.py @@ -7,19 +7,22 @@ ChartInfoClass as ChartInfo, ChartTypeClass, EdgeClass as Edge, - GlossaryTermAssociationClass as Term, - GlossaryTermsClass as GlossaryTerms, KafkaAuditHeaderClass, SystemMetadataClass, ) from datahub.specific.aspect_helpers.custom_properties import HasCustomPropertiesPatch from datahub.specific.aspect_helpers.ownership import HasOwnershipPatch from datahub.specific.aspect_helpers.tags import HasTagsPatch +from datahub.specific.aspect_helpers.terms import HasTermsPatch from datahub.utilities.urns.urn import Urn class ChartPatchBuilder( - HasOwnershipPatch, HasCustomPropertiesPatch, HasTagsPatch, MetadataPatchProposal + HasOwnershipPatch, + HasCustomPropertiesPatch, + HasTagsPatch, + HasTermsPatch, + MetadataPatchProposal, ): def __init__( self, @@ -117,38 +120,6 @@ def set_input_edges(self, inputs: List[Edge]) -> "ChartPatchBuilder": ) return self - def add_term(self, term: Term) -> "ChartPatchBuilder": - """ - Adds a glossary term to the ChartPatchBuilder. - - Args: - term: The Term object representing the glossary term to be added. - - Returns: - The ChartPatchBuilder instance. - """ - self._add_patch( - GlossaryTerms.ASPECT_NAME, "add", path=("terms", term.urn), value=term - ) - return self - - def remove_term(self, term: Union[str, Urn]) -> "ChartPatchBuilder": - """ - Removes a glossary term from the ChartPatchBuilder. - - Args: - term: The term to remove, specified as a string or Urn object. - - Returns: - The ChartPatchBuilder instance. - """ - if isinstance(term, str) and not term.startswith("urn:li:glossaryTerm:"): - term = "urn:li:glossaryTerm:" + term - self._add_patch( - GlossaryTerms.ASPECT_NAME, "remove", path=("terms", term), value={} - ) - return self - def set_title(self, title: str) -> "ChartPatchBuilder": assert title, "ChartInfo title should not be None" self._add_patch( diff --git a/metadata-ingestion/src/datahub/specific/dashboard.py b/metadata-ingestion/src/datahub/specific/dashboard.py index f6a4051a9579a8..515fcf0c6da955 100644 --- a/metadata-ingestion/src/datahub/specific/dashboard.py +++ b/metadata-ingestion/src/datahub/specific/dashboard.py @@ -6,19 +6,22 @@ ChangeAuditStampsClass, DashboardInfoClass as DashboardInfo, EdgeClass as Edge, - GlossaryTermAssociationClass as Term, - GlossaryTermsClass as GlossaryTerms, KafkaAuditHeaderClass, SystemMetadataClass, ) from datahub.specific.aspect_helpers.custom_properties import HasCustomPropertiesPatch from datahub.specific.aspect_helpers.ownership import HasOwnershipPatch from datahub.specific.aspect_helpers.tags import HasTagsPatch +from datahub.specific.aspect_helpers.terms import HasTermsPatch from datahub.utilities.urns.urn import Urn class DashboardPatchBuilder( - HasOwnershipPatch, HasCustomPropertiesPatch, HasTagsPatch, MetadataPatchProposal + HasOwnershipPatch, + HasCustomPropertiesPatch, + HasTagsPatch, + HasTermsPatch, + MetadataPatchProposal, ): def __init__( self, @@ -210,38 +213,6 @@ def set_chart_edges(self, charts: List[Edge]) -> "DashboardPatchBuilder": ) return self - def add_term(self, term: Term) -> "DashboardPatchBuilder": - """ - Adds a glossary term to the DashboardPatchBuilder. - - Args: - term: The Term object representing the glossary term to be added. - - Returns: - The DashboardPatchBuilder instance. - """ - self._add_patch( - GlossaryTerms.ASPECT_NAME, "add", path=("terms", term.urn), value=term - ) - return self - - def remove_term(self, term: Union[str, Urn]) -> "DashboardPatchBuilder": - """ - Removes a glossary term from the DashboardPatchBuilder. - - Args: - term: The term to remove, specified as a string or Urn object. - - Returns: - The DashboardPatchBuilder instance. - """ - if isinstance(term, str) and not term.startswith("urn:li:glossaryTerm:"): - term = "urn:li:glossaryTerm:" + term - self._add_patch( - GlossaryTerms.ASPECT_NAME, "remove", path=("terms", term), value={} - ) - return self - def set_title(self, title: str) -> "DashboardPatchBuilder": assert title, "DashboardInfo title should not be None" self._add_patch( diff --git a/metadata-ingestion/src/datahub/specific/datajob.py b/metadata-ingestion/src/datahub/specific/datajob.py index bd3baeb1e79fa3..fd826c6dd59ca3 100644 --- a/metadata-ingestion/src/datahub/specific/datajob.py +++ b/metadata-ingestion/src/datahub/specific/datajob.py @@ -5,8 +5,6 @@ DataJobInfoClass as DataJobInfo, DataJobInputOutputClass as DataJobInputOutput, EdgeClass as Edge, - GlossaryTermAssociationClass as Term, - GlossaryTermsClass as GlossaryTerms, KafkaAuditHeaderClass, SystemMetadataClass, ) @@ -14,10 +12,15 @@ from datahub.specific.aspect_helpers.custom_properties import HasCustomPropertiesPatch from datahub.specific.aspect_helpers.ownership import HasOwnershipPatch from datahub.specific.aspect_helpers.tags import HasTagsPatch +from datahub.specific.aspect_helpers.terms import HasTermsPatch class DataJobPatchBuilder( - HasOwnershipPatch, HasCustomPropertiesPatch, HasTagsPatch, MetadataPatchProposal + HasOwnershipPatch, + HasCustomPropertiesPatch, + HasTagsPatch, + HasTermsPatch, + MetadataPatchProposal, ): def __init__( self, @@ -427,35 +430,3 @@ def set_output_dataset_fields(self, outputs: List[Edge]) -> "DataJobPatchBuilder value=outputs, ) return self - - def add_term(self, term: Term) -> "DataJobPatchBuilder": - """ - Adds a glossary term to the DataJobPatchBuilder. - - Args: - term: The Term object representing the glossary term to be added. - - Returns: - The DataJobPatchBuilder instance. - """ - self._add_patch( - GlossaryTerms.ASPECT_NAME, "add", path=("terms", term.urn), value=term - ) - return self - - def remove_term(self, term: Union[str, Urn]) -> "DataJobPatchBuilder": - """ - Removes a glossary term from the DataJobPatchBuilder. - - Args: - term: The term to remove, specified as a string or Urn object. - - Returns: - The DataJobPatchBuilder instance. - """ - if isinstance(term, str) and not term.startswith("urn:li:glossaryTerm:"): - term = "urn:li:glossaryTerm:" + term - self._add_patch( - GlossaryTerms.ASPECT_NAME, "remove", path=("terms", term), value={} - ) - return self diff --git a/metadata-ingestion/src/datahub/specific/dataproduct.py b/metadata-ingestion/src/datahub/specific/dataproduct.py index 9734214a0b5187..d38d2d4156315d 100644 --- a/metadata-ingestion/src/datahub/specific/dataproduct.py +++ b/metadata-ingestion/src/datahub/specific/dataproduct.py @@ -1,22 +1,24 @@ -from typing import List, Optional, Tuple, Union +from typing import List, Optional, Tuple from datahub.emitter.mcp_patch_builder import MetadataPatchProposal, PatchPath from datahub.metadata.schema_classes import ( DataProductAssociationClass as DataProductAssociation, DataProductPropertiesClass as DataProductProperties, - GlossaryTermAssociationClass as Term, - GlossaryTermsClass as GlossaryTerms, KafkaAuditHeaderClass, SystemMetadataClass, ) from datahub.specific.aspect_helpers.custom_properties import HasCustomPropertiesPatch from datahub.specific.aspect_helpers.ownership import HasOwnershipPatch from datahub.specific.aspect_helpers.tags import HasTagsPatch -from datahub.utilities.urns.urn import Urn +from datahub.specific.aspect_helpers.terms import HasTermsPatch class DataProductPatchBuilder( - HasOwnershipPatch, HasCustomPropertiesPatch, HasTagsPatch, MetadataPatchProposal + HasOwnershipPatch, + HasCustomPropertiesPatch, + HasTagsPatch, + HasTermsPatch, + MetadataPatchProposal, ): def __init__( self, @@ -34,20 +36,6 @@ def __init__( def _custom_properties_location(cls) -> Tuple[str, PatchPath]: return DataProductProperties.ASPECT_NAME, ("customProperties",) - def add_term(self, term: Term) -> "DataProductPatchBuilder": - self._add_patch( - GlossaryTerms.ASPECT_NAME, "add", path=("terms", term.urn), value=term - ) - return self - - def remove_term(self, term: Union[str, Urn]) -> "DataProductPatchBuilder": - if isinstance(term, str) and not term.startswith("urn:li:glossaryTerm:"): - term = "urn:li:glossaryTerm:" + term - self._add_patch( - GlossaryTerms.ASPECT_NAME, "remove", path=("terms", term), value={} - ) - return self - def set_name(self, name: str) -> "DataProductPatchBuilder": self._add_patch( DataProductProperties.ASPECT_NAME, diff --git a/metadata-ingestion/src/datahub/specific/dataset.py b/metadata-ingestion/src/datahub/specific/dataset.py index 4d52d99e822e78..c8674165562f78 100644 --- a/metadata-ingestion/src/datahub/specific/dataset.py +++ b/metadata-ingestion/src/datahub/specific/dataset.py @@ -8,7 +8,6 @@ EditableSchemaMetadataClass as EditableSchemaMetadata, FineGrainedLineageClass as FineGrainedLineage, GlossaryTermAssociationClass as Term, - GlossaryTermsClass as GlossaryTerms, KafkaAuditHeaderClass, SchemaMetadataClass, SystemMetadataClass, @@ -22,6 +21,7 @@ HasStructuredPropertiesPatch, ) from datahub.specific.aspect_helpers.tags import HasTagsPatch +from datahub.specific.aspect_helpers.terms import HasTermsPatch from datahub.utilities.urns.tag_urn import TagUrn from datahub.utilities.urns.urn import Urn @@ -99,6 +99,7 @@ class DatasetPatchBuilder( HasCustomPropertiesPatch, HasStructuredPropertiesPatch, HasTagsPatch, + HasTermsPatch, MetadataPatchProposal, ): def __init__( @@ -214,20 +215,6 @@ def set_fine_grained_upstream_lineages( ) return self - def add_term(self, term: Term) -> "DatasetPatchBuilder": - self._add_patch( - GlossaryTerms.ASPECT_NAME, "add", path=("terms", term.urn), value=term - ) - return self - - def remove_term(self, term: Union[str, Urn]) -> "DatasetPatchBuilder": - if isinstance(term, str) and not term.startswith("urn:li:glossaryTerm:"): - term = "urn:li:glossaryTerm:" + term - self._add_patch( - GlossaryTerms.ASPECT_NAME, "remove", path=("terms", term), value={} - ) - return self - def for_field( self, field_path: str, editable: bool = True ) -> FieldPatchHelper["DatasetPatchBuilder"]: From 8dcae9bdd66932ab1bda4bd82f8db0bf8551cebf Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 2 Jan 2025 14:40:33 -0500 Subject: [PATCH 10/11] rename helper --- metadata-ingestion/src/datahub/specific/dataset.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/specific/dataset.py b/metadata-ingestion/src/datahub/specific/dataset.py index c8674165562f78..6332386684bbf0 100644 --- a/metadata-ingestion/src/datahub/specific/dataset.py +++ b/metadata-ingestion/src/datahub/specific/dataset.py @@ -154,7 +154,7 @@ def add_fine_grained_upstream_lineage( self._add_patch( UpstreamLineage.ASPECT_NAME, "add", - path=DatasetPatchBuilder.quote_fine_grained_path( + path=self._build_fine_grained_path( transform_op, downstream_urn, query_id, upstream_urn ), value={"confidenceScore": fine_grained_lineage.confidenceScore}, @@ -174,7 +174,7 @@ def get_fine_grained_key( return transform_op, downstream_urn, query_id @classmethod - def quote_fine_grained_path( + def _build_fine_grained_path( cls, transform_op: str, downstream_urn: str, query_id: str, upstream_urn: str ) -> PatchPath: return ( @@ -197,7 +197,7 @@ def remove_fine_grained_upstream_lineage( self._add_patch( UpstreamLineage.ASPECT_NAME, "remove", - path=DatasetPatchBuilder.quote_fine_grained_path( + path=self._build_fine_grained_path( transform_op, downstream_urn, query_id, upstream_urn ), value={}, From d9153abeedabde52542822dae721da84cb8adc46 Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Thu, 2 Jan 2025 14:40:54 -0500 Subject: [PATCH 11/11] fix codecov flags --- .github/workflows/airflow-plugin.yml | 4 ++-- .github/workflows/metadata-ingestion.yml | 9 +++------ .github/workflows/prefect-plugin.yml | 4 ++-- 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/.github/workflows/airflow-plugin.yml b/.github/workflows/airflow-plugin.yml index 26fcceb8aeab70..b824a21be63f8f 100644 --- a/.github/workflows/airflow-plugin.yml +++ b/.github/workflows/airflow-plugin.yml @@ -84,8 +84,8 @@ jobs: token: ${{ secrets.CODECOV_TOKEN }} directory: ./build/coverage-reports/ fail_ci_if_error: false - flags: airflow,airflow-${{ matrix.extra_pip_extras }} - name: pytest-airflow-${{ matrix.python-version }}-${{ matrix.extra_pip_requirements }} + flags: airflow-${{ matrix.python-version }}-${{ matrix.extra_pip_extras }} + name: pytest-airflow verbose: true event-file: diff --git a/.github/workflows/metadata-ingestion.yml b/.github/workflows/metadata-ingestion.yml index 106cba1473982e..f4d87b361b5edc 100644 --- a/.github/workflows/metadata-ingestion.yml +++ b/.github/workflows/metadata-ingestion.yml @@ -41,9 +41,6 @@ jobs: "testIntegrationBatch1", "testIntegrationBatch2", ] - include: - - python-version: "3.8" - - python-version: "3.11" fail-fast: false steps: - name: Free up disk space @@ -92,14 +89,14 @@ jobs: **/junit.*.xml !**/binary/** - name: Upload coverage to Codecov - if: ${{ always() && matrix.python-version == '3.10' }} + if: ${{ always() }} uses: codecov/codecov-action@v5 with: token: ${{ secrets.CODECOV_TOKEN }} directory: ./build/coverage-reports/ fail_ci_if_error: false - flags: pytest-${{ matrix.command }} - name: pytest-${{ matrix.python-version }}-${{ matrix.command }} + flags: ingestion-${{ matrix.python-version }}-${{ matrix.command }} + name: pytest-ingestion verbose: true event-file: diff --git a/.github/workflows/prefect-plugin.yml b/.github/workflows/prefect-plugin.yml index d77142a1f00ded..879df032409f28 100644 --- a/.github/workflows/prefect-plugin.yml +++ b/.github/workflows/prefect-plugin.yml @@ -67,8 +67,8 @@ jobs: token: ${{ secrets.CODECOV_TOKEN }} directory: ./build/coverage-reports/ fail_ci_if_error: false - flags: prefect,prefect-${{ matrix.python-version }} - name: pytest-prefect-${{ matrix.python-version }} + flags: prefect-${{ matrix.python-version }} + name: pytest-prefect verbose: true event-file: