Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(sdk/patch): improve patch implementation internals #12253

Merged
merged 11 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/airflow-plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ jobs:
token: ${{ secrets.CODECOV_TOKEN }}
directory: ./build/coverage-reports/
fail_ci_if_error: false
flags: airflow,airflow-${{ matrix.extra_pip_extras }}
name: pytest-airflow-${{ matrix.python-version }}-${{ matrix.extra_pip_requirements }}
flags: airflow-${{ matrix.python-version }}-${{ matrix.extra_pip_extras }}
name: pytest-airflow
verbose: true

event-file:
Expand Down
9 changes: 3 additions & 6 deletions .github/workflows/metadata-ingestion.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ jobs:
"testIntegrationBatch1",
"testIntegrationBatch2",
]
include:
- python-version: "3.8"
- python-version: "3.11"
fail-fast: false
steps:
- name: Free up disk space
Expand Down Expand Up @@ -92,14 +89,14 @@ jobs:
**/junit.*.xml
!**/binary/**
- name: Upload coverage to Codecov
if: ${{ always() && matrix.python-version == '3.10' }}
if: ${{ always() }}
uses: codecov/codecov-action@v5
with:
token: ${{ secrets.CODECOV_TOKEN }}
directory: ./build/coverage-reports/
fail_ci_if_error: false
flags: pytest-${{ matrix.command }}
name: pytest-${{ matrix.python-version }}-${{ matrix.command }}
flags: ingestion-${{ matrix.python-version }}-${{ matrix.command }}
name: pytest-ingestion
verbose: true

event-file:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/prefect-plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,8 @@ jobs:
token: ${{ secrets.CODECOV_TOKEN }}
directory: ./build/coverage-reports/
fail_ci_if_error: false
flags: prefect,prefect-${{ matrix.python-version }}
name: pytest-prefect-${{ matrix.python-version }}
flags: prefect-${{ matrix.python-version }}
name: pytest-prefect
verbose: true

event-file:
Expand Down
6 changes: 3 additions & 3 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import typing_inspect
from avrogen.dict_wrapper import DictWrapper
from typing_extensions import assert_never

from datahub.emitter.enum_helpers import get_enum_options
from datahub.metadata.schema_classes import (
Expand Down Expand Up @@ -269,9 +270,8 @@
return make_user_urn(owner)
elif owner_type == OwnerType.GROUP:
return make_group_urn(owner)
# This should pretty much never happen.
# TODO: With Python 3.11, we can use typing.assert_never() here.
return f"urn:li:{owner_type.value}:{owner}"
else:
assert_never(owner_type)

Check warning on line 274 in metadata-ingestion/src/datahub/emitter/mce_builder.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/emitter/mce_builder.py#L274

Added line #L274 was not covered by tests


def make_ownership_type_urn(type: str) -> str:
Expand Down
48 changes: 36 additions & 12 deletions metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,19 @@
import time
from collections import defaultdict
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Sequence, Union
from typing import (
Any,
Dict,
List,
Literal,
Optional,
Protocol,
Tuple,
Union,
runtime_checkable,
)

from typing_extensions import LiteralString

from datahub.emitter.aspect import JSON_PATCH_CONTENT_TYPE
from datahub.emitter.serialization_helper import pre_json_transform
Expand All @@ -19,25 +31,36 @@
from datahub.utilities.urns.urn import guess_entity_type


@runtime_checkable
class SupportsToObj(Protocol):
def to_obj(self) -> Any:
...

Check warning on line 37 in metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/emitter/mcp_patch_builder.py#L37

Added line #L37 was not covered by tests


def _recursive_to_obj(obj: Any) -> Any:
if isinstance(obj, list):
return [_recursive_to_obj(v) for v in obj]
elif hasattr(obj, "to_obj"):
elif isinstance(obj, SupportsToObj):
return obj.to_obj()
else:
return obj


PatchPath = Tuple[Union[LiteralString, Urn], ...]
PatchOp = Literal["add", "remove", "replace"]


@dataclass
class _Patch:
op: str # one of ['add', 'remove', 'replace']; we don't support move, copy or test
path: str
class _Patch(SupportsToObj):
op: PatchOp
path: PatchPath
value: Any

def to_obj(self) -> Dict:
quoted_path = "/" + "/".join(MetadataPatchProposal.quote(p) for p in self.path)
return {
"op": self.op,
"path": self.path,
"path": quoted_path,
"value": _recursive_to_obj(self.value),
}

Expand All @@ -63,15 +86,16 @@

# Json Patch quoting based on https://jsonpatch.com/#json-pointer
@classmethod
def quote(cls, value: str) -> str:
return value.replace("~", "~0").replace("/", "~1")
def quote(cls, value: Union[str, Urn]) -> str:
return str(value).replace("~", "~0").replace("/", "~1")

def _add_patch(
self, aspect_name: str, op: str, path: Union[str, Sequence[str]], value: Any
self,
aspect_name: str,
op: PatchOp,
path: PatchPath,
value: Any,
) -> None:
if not isinstance(path, str):
path = "/" + "/".join(self.quote(p) for p in path)

# TODO: Validate that aspectName is a valid aspect for this entityType
self.patches[aspect_name].append(_Patch(op, path, value))

Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from abc import abstractmethod
from typing import Dict, Optional, Tuple

from typing_extensions import Self

from datahub.emitter.mcp_patch_builder import MetadataPatchProposal, PatchPath


class HasCustomPropertiesPatch(MetadataPatchProposal):
@classmethod
@abstractmethod
def _custom_properties_location(self) -> Tuple[str, PatchPath]:
...

def add_custom_property(self, key: str, value: str) -> Self:
"""Add a custom property to the entity.

Args:
key: The key of the custom property.
value: The value of the custom property.

Returns:
The patch builder instance.
"""
aspect_name, path = self._custom_properties_location()
self._add_patch(
aspect_name,
"add",
path=(*path, key),
value=value,
)
return self

def add_custom_properties(
self, custom_properties: Optional[Dict[str, str]] = None
) -> Self:
if custom_properties is not None:
for key, value in custom_properties.items():
self.add_custom_property(key, value)
return self

def remove_custom_property(self, key: str) -> Self:
"""Remove a custom property from the entity.

Args:
key: The key of the custom property to remove.

Returns:
The patch builder instance.
"""
aspect_name, path = self._custom_properties_location()
self._add_patch(

Check warning on line 52 in metadata-ingestion/src/datahub/specific/aspect_helpers/custom_properties.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/specific/aspect_helpers/custom_properties.py#L51-L52

Added lines #L51 - L52 were not covered by tests
aspect_name,
"remove",
path=(*path, key),
value={},
)
return self

Check warning on line 58 in metadata-ingestion/src/datahub/specific/aspect_helpers/custom_properties.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/specific/aspect_helpers/custom_properties.py#L58

Added line #L58 was not covered by tests

def set_custom_properties(self, custom_properties: Dict[str, str]) -> Self:
"""Sets the custom properties of the entity.

This method replaces all existing custom properties with the given dictionary.

Args:
custom_properties: A dictionary containing the custom properties to be set.

Returns:
The patch builder instance.
"""

aspect_name, path = self._custom_properties_location()
self._add_patch(
aspect_name,
"add",
path=path,
value=custom_properties,
)
return self
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from typing import List, Optional

from typing_extensions import Self

from datahub.emitter.mcp_patch_builder import MetadataPatchProposal
from datahub.metadata.schema_classes import (
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
)


class HasOwnershipPatch(MetadataPatchProposal):
def add_owner(self, owner: OwnerClass) -> Self:
"""Add an owner to the entity.

Args:
owner: The Owner object to add.

Returns:
The patch builder instance.
"""
self._add_patch(
OwnershipClass.ASPECT_NAME,
"add",
path=("owners", owner.owner, str(owner.type)),
value=owner,
)
return self

def remove_owner(
self, owner: str, owner_type: Optional[OwnershipTypeClass] = None
) -> Self:
"""Remove an owner from the entity.

If owner_type is not provided, the owner will be removed regardless of ownership type.

Args:
owner: The owner to remove.
owner_type: The ownership type of the owner (optional).

Returns:
The patch builder instance.
"""
self._add_patch(

Check warning on line 45 in metadata-ingestion/src/datahub/specific/aspect_helpers/ownership.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/specific/aspect_helpers/ownership.py#L45

Added line #L45 was not covered by tests
OwnershipClass.ASPECT_NAME,
"remove",
path=("owners", owner) + ((str(owner_type),) if owner_type else ()),
value=owner,
)
return self

Check warning on line 51 in metadata-ingestion/src/datahub/specific/aspect_helpers/ownership.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/specific/aspect_helpers/ownership.py#L51

Added line #L51 was not covered by tests

def set_owners(self, owners: List[OwnerClass]) -> Self:
"""Set the owners of the entity.

This will effectively replace all existing owners with the new list - it doesn't really patch things.

Args:
owners: The list of owners to set.

Returns:
The patch builder instance.
"""
self._add_patch(

Check warning on line 64 in metadata-ingestion/src/datahub/specific/aspect_helpers/ownership.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/specific/aspect_helpers/ownership.py#L64

Added line #L64 was not covered by tests
OwnershipClass.ASPECT_NAME, "add", path=("owners",), value=owners
)
return self

Check warning on line 67 in metadata-ingestion/src/datahub/specific/aspect_helpers/ownership.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/specific/aspect_helpers/ownership.py#L67

Added line #L67 was not covered by tests
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from typing import List, Union

from typing_extensions import Self

from datahub.emitter.mcp_patch_builder import MetadataPatchProposal
from datahub.metadata.schema_classes import (
StructuredPropertiesClass,
StructuredPropertyValueAssignmentClass,
)
from datahub.utilities.urns.structured_properties_urn import (
make_structured_property_urn,
)


class HasStructuredPropertiesPatch(MetadataPatchProposal):
def set_structured_property(
self, key: str, value: Union[str, float, List[Union[str, float]]]
) -> Self:
"""Add or update a structured property.

Args:
key: the name of the property (either bare or urn form)
value: the value of the property (for multi-valued properties, this can be a list)

Returns:
The patch builder instance.
"""
self.remove_structured_property(key)
self.add_structured_property(key, value)
return self

Check warning on line 30 in metadata-ingestion/src/datahub/specific/aspect_helpers/structured_properties.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/specific/aspect_helpers/structured_properties.py#L28-L30

Added lines #L28 - L30 were not covered by tests

def remove_structured_property(self, key: str) -> Self:
"""Remove a structured property.

Args:
key: the name of the property (either bare or urn form)

Returns:
The patch builder instance.
"""

self._add_patch(

Check warning on line 42 in metadata-ingestion/src/datahub/specific/aspect_helpers/structured_properties.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/specific/aspect_helpers/structured_properties.py#L42

Added line #L42 was not covered by tests
StructuredPropertiesClass.ASPECT_NAME,
"remove",
path=("properties", make_structured_property_urn(key)),
value={},
)
return self

Check warning on line 48 in metadata-ingestion/src/datahub/specific/aspect_helpers/structured_properties.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/specific/aspect_helpers/structured_properties.py#L48

Added line #L48 was not covered by tests

def add_structured_property(
self, key: str, value: Union[str, float, List[Union[str, float]]]
) -> Self:
"""Add a structured property.

Args:
key: the name of the property (either bare or urn form)
value: the value of the property (for multi-valued properties, this value will be appended to the list)

Returns:
The patch builder instance.
"""

self._add_patch(

Check warning on line 63 in metadata-ingestion/src/datahub/specific/aspect_helpers/structured_properties.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/specific/aspect_helpers/structured_properties.py#L63

Added line #L63 was not covered by tests
StructuredPropertiesClass.ASPECT_NAME,
"add",
path=("properties", make_structured_property_urn(key)),
value=StructuredPropertyValueAssignmentClass(
propertyUrn=make_structured_property_urn(key),
values=value if isinstance(value, list) else [value],
),
)
return self

Check warning on line 72 in metadata-ingestion/src/datahub/specific/aspect_helpers/structured_properties.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/specific/aspect_helpers/structured_properties.py#L72

Added line #L72 was not covered by tests
Loading
Loading