Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion): Add typeUrn handling to ownership transformers #9370

Merged
32 changes: 16 additions & 16 deletions metadata-ingestion/docs/transformer/dataset_transformer.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ transformers:
```
## Simple Add Dataset ownership
### Config Details
| Field | Required | Type | Default | Description |
|-----------------------------|----------|--------------|---------------|------------------------------------------------------------------|
| `owner_urns` | ✅ | list[string] | | List of owner urns. |
| `ownership_type` | | string | `DATAOWNER` | ownership type of the owners. |
| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| Field | Required | Type | Default | Description |
|--------------------|----------|--------------|-------------|---------------------------------------------------------------------|
| `owner_urns` | ✅ | list[string] | | List of owner urns. |
| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) |
| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |

For transformer behaviour on `replace_existing` and `semantics`, please refer section [Relationship Between replace_existing And semantics](#relationship-between-replace_existing-and-semantics).

Expand Down Expand Up @@ -95,7 +95,7 @@ transformers:
- "urn:li:corpuser:username1"
- "urn:li:corpuser:username2"
- "urn:li:corpGroup:groupname"
ownership_type: "PRODUCER"
ownership_type: "urn:li:ownershipType:__system__producer"
```
- Add owners, however overwrite the owners available for the dataset on DataHub GMS
```yaml
Expand All @@ -107,7 +107,7 @@ transformers:
- "urn:li:corpuser:username1"
- "urn:li:corpuser:username2"
- "urn:li:corpGroup:groupname"
ownership_type: "PRODUCER"
ownership_type: "urn:li:ownershipType:__system__producer"
```
- Add owners, however keep the owners available for the dataset on DataHub GMS
```yaml
Expand All @@ -124,12 +124,12 @@ transformers:

## Pattern Add Dataset ownership
### Config Details
| Field | Required | Type | Default | Description |
|-----------------------------|--------- |-----------------------|------------------|-----------------------------------------------------------------------------------------|
| `owner_pattern` | ✅ | map[regx, list[urn]] | | entity urn with regular expression and list of owners urn apply to matching entity urn. |
| `ownership_type` | | string | `DATAOWNER` | ownership type of the owners. |
| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |
| Field | Required | Type | Default | Description |
|--------------------|----------|----------------------|-------------|-----------------------------------------------------------------------------------------|
| `owner_pattern` | ✅ | map[regx, list[urn]] | | entity urn with regular expression and list of owners urn apply to matching entity urn. |
| `ownership_type` | | string | "DATAOWNER" | ownership type of the owners (either as enum or ownership type urn) |
| `replace_existing` | | boolean | `false` | Whether to remove owners from entity sent by ingestion source. |
| `semantics` | | enum | `OVERWRITE` | Whether to OVERWRITE or PATCH the entity present on DataHub GMS. |

let’s suppose we’d like to append a series of users who we know to own a different dataset from a data source but aren't detected during normal ingestion. To do so, we can use the `pattern_add_dataset_ownership` module that’s included in the ingestion framework. This will match the pattern to `urn` of the dataset and assign the respective owners.

Expand Down Expand Up @@ -158,7 +158,7 @@ The config, which we’d append to our ingestion recipe YAML, would look like th
rules:
".*example1.*": ["urn:li:corpuser:username1"]
".*example2.*": ["urn:li:corpuser:username2"]
ownership_type: "PRODUCER"
ownership_type: "urn:li:ownershipType:__system__producer"
```
- Add owner, however overwrite the owners available for the dataset on DataHub GMS
```yaml
Expand All @@ -170,7 +170,7 @@ The config, which we’d append to our ingestion recipe YAML, would look like th
rules:
".*example1.*": ["urn:li:corpuser:username1"]
".*example2.*": ["urn:li:corpuser:username2"]
ownership_type: "PRODUCER"
ownership_type: "urn:li:ownershipType:__system__producer"
```
- Add owner, however keep the owners available for the dataset on DataHub GMS
```yaml
Expand Down
31 changes: 13 additions & 18 deletions metadata-ingestion/src/datahub/emitter/mce_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@
from typing import (
TYPE_CHECKING,
Any,
Iterable,
List,
Optional,
Tuple,
Type,
TypeVar,
Union,
cast,
get_type_hints,
)

Expand Down Expand Up @@ -342,26 +343,20 @@ def make_ml_model_group_urn(platform: str, group_name: str, env: str) -> str:
)


def is_valid_ownership_type(ownership_type: Optional[str]) -> bool:
return ownership_type is not None and ownership_type in [
OwnershipTypeClass.TECHNICAL_OWNER,
OwnershipTypeClass.BUSINESS_OWNER,
OwnershipTypeClass.DATA_STEWARD,
OwnershipTypeClass.NONE,
OwnershipTypeClass.DEVELOPER,
OwnershipTypeClass.DATAOWNER,
OwnershipTypeClass.DELEGATE,
OwnershipTypeClass.PRODUCER,
OwnershipTypeClass.CONSUMER,
OwnershipTypeClass.STAKEHOLDER,
def get_class_fields(_class: Type[object]) -> Iterable[str]:
return [
f
for f in dir(_class)
if not callable(getattr(_class, f)) and not f.startswith("_")
]


def validate_ownership_type(ownership_type: Optional[str]) -> str:
if is_valid_ownership_type(ownership_type):
return cast(str, ownership_type)
else:
raise ValueError(f"Unexpected ownership type: {ownership_type}")
def validate_ownership_type(ownership_type: str) -> Tuple[str, Optional[str]]:
if ownership_type.startswith("urn:li:"):
return OwnershipTypeClass.CUSTOM, ownership_type
if ownership_type in get_class_fields(OwnershipTypeClass):
return ownership_type, None
raise ValueError(f"Unexpected ownership type: {ownership_type}")


def make_lineage_mce(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@
from datahub.ingestion.transformer.dataset_transformer import (
DatasetOwnershipTransformer,
)
from datahub.metadata.schema_classes import (
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
)
from datahub.metadata._schema_classes import OwnershipTypeClass
from datahub.metadata.schema_classes import OwnerClass, OwnershipClass


class AddDatasetOwnershipConfig(TransformerSemanticsConfigModel):
Expand Down Expand Up @@ -102,7 +99,7 @@ def transform_aspect(


class DatasetOwnershipBaseConfig(TransformerSemanticsConfigModel):
ownership_type: Optional[str] = OwnershipTypeClass.DATAOWNER
ownership_type: str = OwnershipTypeClass.DATAOWNER


class SimpleDatasetOwnershipConfig(DatasetOwnershipBaseConfig):
Expand All @@ -114,11 +111,14 @@ class SimpleAddDatasetOwnership(AddDatasetOwnership):
"""Transformer that adds a specified set of owners to each dataset."""

def __init__(self, config: SimpleDatasetOwnershipConfig, ctx: PipelineContext):
ownership_type = builder.validate_ownership_type(config.ownership_type)
ownership_type, ownership_type_urn = builder.validate_ownership_type(
config.ownership_type
)
owners = [
OwnerClass(
owner=owner,
type=ownership_type,
typeUrn=ownership_type_urn,
)
for owner in config.owner_urns
]
Expand Down Expand Up @@ -147,29 +147,17 @@ class PatternDatasetOwnershipConfig(DatasetOwnershipBaseConfig):
class PatternAddDatasetOwnership(AddDatasetOwnership):
"""Transformer that adds a specified set of owners to each dataset."""

def getOwners(
self,
key: str,
owner_pattern: KeyValuePattern,
ownership_type: Optional[str] = None,
) -> List[OwnerClass]:
owners = [
OwnerClass(
owner=owner,
type=builder.validate_ownership_type(ownership_type),
)
for owner in owner_pattern.value(key)
]
return owners

def __init__(self, config: PatternDatasetOwnershipConfig, ctx: PipelineContext):
ownership_type = builder.validate_ownership_type(config.ownership_type)
owner_pattern = config.owner_pattern
ownership_type, ownership_type_urn = builder.validate_ownership_type(
config.ownership_type
)
generic_config = AddDatasetOwnershipConfig(
get_owners_to_add=lambda urn: [
OwnerClass(
owner=owner,
type=ownership_type,
typeUrn=ownership_type_urn,
)
for owner in owner_pattern.value(urn)
],
Expand Down
5 changes: 4 additions & 1 deletion metadata-ingestion/tests/unit/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,10 @@ def test_run_including_registered_transformation(self):
"transformers": [
{
"type": "simple_add_dataset_ownership",
"config": {"owner_urns": ["urn:li:corpuser:foo"]},
"config": {
"owner_urns": ["urn:li:corpuser:foo"],
"ownership_type": "urn:li:ownershipType:__system__technical_owner",
},
}
],
"sink": {"type": "tests.test_helpers.sink_helpers.RecordingSink"},
Expand Down
44 changes: 42 additions & 2 deletions metadata-ingestion/tests/unit/test_transform_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ def test_simple_dataset_ownership_transformation(mock_time):
assert last_event.entityUrn == outputs[0].record.proposedSnapshot.urn
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER
owner.type == models.OwnershipTypeClass.DATAOWNER and owner.typeUrn is None
for owner in last_event.aspect.owners
]
)
Expand All @@ -247,7 +247,7 @@ def test_simple_dataset_ownership_transformation(mock_time):
assert len(second_ownership_aspect.owners) == 3
assert all(
[
owner.type == models.OwnershipTypeClass.DATAOWNER
owner.type == models.OwnershipTypeClass.DATAOWNER and owner.typeUrn is None
for owner in second_ownership_aspect.owners
]
)
Expand Down Expand Up @@ -293,6 +293,44 @@ def test_simple_dataset_ownership_with_type_transformation(mock_time):
assert ownership_aspect.owners[0].type == models.OwnershipTypeClass.PRODUCER


def test_simple_dataset_ownership_with_type_urn_transformation(mock_time):
input = make_generic_dataset()

transformer = SimpleAddDatasetOwnership.create(
{
"owner_urns": [
builder.make_user_urn("person1"),
],
"ownership_type": "urn:li:ownershipType:__system__technical_owner",
},
PipelineContext(run_id="test"),
)

output = list(
transformer.transform(
[
RecordEnvelope(input, metadata={}),
RecordEnvelope(EndOfStream(), metadata={}),
]
)
)

assert len(output) == 3

# original MCE is unchanged
assert input == output[0].record

ownership_aspect = output[1].record.aspect

assert isinstance(ownership_aspect, OwnershipClass)
assert len(ownership_aspect.owners) == 1
assert ownership_aspect.owners[0].type == OwnershipTypeClass.CUSTOM
assert (
ownership_aspect.owners[0].typeUrn
== "urn:li:ownershipType:__system__technical_owner"
)


def _test_extract_tags(in_urn: str, regex_str: str, out_tag: str) -> None:
input = make_generic_dataset(entity_urn=in_urn)
transformer = ExtractDatasetTags.create(
Expand Down Expand Up @@ -883,6 +921,7 @@ def test_pattern_dataset_ownership_transformation(mock_time):
".*example2.*": [builder.make_user_urn("person2")],
}
},
"ownership_type": "DATAOWNER",
},
PipelineContext(run_id="test"),
)
Expand Down Expand Up @@ -2233,6 +2272,7 @@ def fake_ownership_class(entity_urn: str) -> models.OwnershipClass:
"replace_existing": False,
"semantics": TransformerSemantics.PATCH,
"owner_urns": [owner2],
"ownership_type": "DATAOWNER",
},
pipeline_context=pipeline_context,
)
Expand Down
Loading