-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingestion): Add typeUrn handling to ownership transformers #9370
Changes from 8 commits
337aaa9
4f694ce
e6e13a6
2aa1d13
d994f9d
f7b8f49
22e7fbb
0334f36
7eb113b
e526e14
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -14,11 +14,7 @@ | |||||
from datahub.ingestion.transformer.dataset_transformer import ( | ||||||
DatasetOwnershipTransformer, | ||||||
) | ||||||
from datahub.metadata.schema_classes import ( | ||||||
OwnerClass, | ||||||
OwnershipClass, | ||||||
OwnershipTypeClass, | ||||||
) | ||||||
from datahub.metadata.schema_classes import OwnerClass, OwnershipClass | ||||||
|
||||||
|
||||||
class AddDatasetOwnershipConfig(TransformerSemanticsConfigModel): | ||||||
|
@@ -102,7 +98,7 @@ def transform_aspect( | |||||
|
||||||
|
||||||
class DatasetOwnershipBaseConfig(TransformerSemanticsConfigModel): | ||||||
ownership_type: Optional[str] = OwnershipTypeClass.DATAOWNER | ||||||
ownership_type: Optional[str] | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we don't need to do this in this PR, but it might simplify some of the
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reverted to that (changed it in first place due to addition of another field, now removed). |
||||||
|
||||||
|
||||||
class SimpleDatasetOwnershipConfig(DatasetOwnershipBaseConfig): | ||||||
|
@@ -114,11 +110,14 @@ class SimpleAddDatasetOwnership(AddDatasetOwnership): | |||||
"""Transformer that adds a specified set of owners to each dataset.""" | ||||||
|
||||||
def __init__(self, config: SimpleDatasetOwnershipConfig, ctx: PipelineContext): | ||||||
ownership_type = builder.validate_ownership_type(config.ownership_type) | ||||||
ownership_type, ownership_type_urn = builder.validate_ownership_type( | ||||||
config.ownership_type | ||||||
) | ||||||
owners = [ | ||||||
OwnerClass( | ||||||
owner=owner, | ||||||
type=ownership_type, | ||||||
typeUrn=ownership_type_urn, | ||||||
) | ||||||
for owner in config.owner_urns | ||||||
] | ||||||
|
@@ -147,29 +146,17 @@ class PatternDatasetOwnershipConfig(DatasetOwnershipBaseConfig): | |||||
class PatternAddDatasetOwnership(AddDatasetOwnership): | ||||||
"""Transformer that adds a specified set of owners to each dataset.""" | ||||||
|
||||||
def getOwners( | ||||||
self, | ||||||
key: str, | ||||||
owner_pattern: KeyValuePattern, | ||||||
ownership_type: Optional[str] = None, | ||||||
) -> List[OwnerClass]: | ||||||
owners = [ | ||||||
OwnerClass( | ||||||
owner=owner, | ||||||
type=builder.validate_ownership_type(ownership_type), | ||||||
) | ||||||
for owner in owner_pattern.value(key) | ||||||
] | ||||||
return owners | ||||||
|
||||||
def __init__(self, config: PatternDatasetOwnershipConfig, ctx: PipelineContext): | ||||||
ownership_type = builder.validate_ownership_type(config.ownership_type) | ||||||
owner_pattern = config.owner_pattern | ||||||
ownership_type, ownership_type_urn = builder.validate_ownership_type( | ||||||
config.ownership_type | ||||||
) | ||||||
generic_config = AddDatasetOwnershipConfig( | ||||||
get_owners_to_add=lambda urn: [ | ||||||
OwnerClass( | ||||||
owner=owner, | ||||||
type=ownership_type, | ||||||
typeUrn=ownership_type_urn, | ||||||
) | ||||||
for owner in owner_pattern.value(urn) | ||||||
], | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -210,7 +210,8 @@ def test_simple_dataset_ownership_transformation(mock_time): | |
"owner_urns": [ | ||
builder.make_user_urn("person1"), | ||
builder.make_user_urn("person2"), | ||
] | ||
], | ||
"ownership_type": "DATAOWNER", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are these defaults necessary - it looks like validate_ownership_type will add this default right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mainly just want to double check that this change is backwards compatible There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I removed |
||
}, | ||
PipelineContext(run_id="test"), | ||
) | ||
|
@@ -234,7 +235,7 @@ def test_simple_dataset_ownership_transformation(mock_time): | |
assert last_event.entityUrn == outputs[0].record.proposedSnapshot.urn | ||
assert all( | ||
[ | ||
owner.type == models.OwnershipTypeClass.DATAOWNER | ||
owner.type == models.OwnershipTypeClass.DATAOWNER and owner.typeUrn is None | ||
for owner in last_event.aspect.owners | ||
] | ||
) | ||
|
@@ -247,7 +248,7 @@ def test_simple_dataset_ownership_transformation(mock_time): | |
assert len(second_ownership_aspect.owners) == 3 | ||
assert all( | ||
[ | ||
owner.type == models.OwnershipTypeClass.DATAOWNER | ||
owner.type == models.OwnershipTypeClass.DATAOWNER and owner.typeUrn is None | ||
for owner in second_ownership_aspect.owners | ||
] | ||
) | ||
|
@@ -293,6 +294,44 @@ def test_simple_dataset_ownership_with_type_transformation(mock_time): | |
assert ownership_aspect.owners[0].type == models.OwnershipTypeClass.PRODUCER | ||
|
||
|
||
def test_simple_dataset_ownership_with_type_urn_transformation(mock_time): | ||
input = make_generic_dataset() | ||
|
||
transformer = SimpleAddDatasetOwnership.create( | ||
{ | ||
"owner_urns": [ | ||
builder.make_user_urn("person1"), | ||
], | ||
"ownership_type": "urn:li:ownershipType:__system__technical_owner", | ||
}, | ||
PipelineContext(run_id="test"), | ||
) | ||
|
||
output = list( | ||
transformer.transform( | ||
[ | ||
RecordEnvelope(input, metadata={}), | ||
RecordEnvelope(EndOfStream(), metadata={}), | ||
] | ||
) | ||
) | ||
|
||
assert len(output) == 3 | ||
|
||
# original MCE is unchanged | ||
assert input == output[0].record | ||
|
||
ownership_aspect = output[1].record.aspect | ||
|
||
assert isinstance(ownership_aspect, OwnershipClass) | ||
assert len(ownership_aspect.owners) == 1 | ||
assert ownership_aspect.owners[0].type == OwnershipTypeClass.CUSTOM | ||
assert ( | ||
ownership_aspect.owners[0].typeUrn | ||
== "urn:li:ownershipType:__system__technical_owner" | ||
) | ||
|
||
|
||
def _test_extract_tags(in_urn: str, regex_str: str, out_tag: str) -> None: | ||
input = make_generic_dataset(entity_urn=in_urn) | ||
transformer = ExtractDatasetTags.create( | ||
|
@@ -883,6 +922,7 @@ def test_pattern_dataset_ownership_transformation(mock_time): | |
".*example2.*": [builder.make_user_urn("person2")], | ||
} | ||
}, | ||
"ownership_type": "DATAOWNER", | ||
}, | ||
PipelineContext(run_id="test"), | ||
) | ||
|
@@ -2233,6 +2273,7 @@ def fake_ownership_class(entity_urn: str) -> models.OwnershipClass: | |
"replace_existing": False, | ||
"semantics": TransformerSemantics.PATCH, | ||
"owner_urns": [owner2], | ||
"ownership_type": "DATAOWNER", | ||
}, | ||
pipeline_context=pipeline_context, | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this cast call isn't required
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, refactored and simplified this function