Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(transform): adds simple add dataset properties transform #3778

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABC, abstractmethod
from typing import Dict, Type
from typing import Any, Dict, Type

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
Expand Down Expand Up @@ -34,9 +34,15 @@ class AddDatasetProperties(DatasetTransformer):
ctx: PipelineContext
config: AddDatasetPropertiesConfig

def __init__(self, config: AddDatasetPropertiesConfig, ctx: PipelineContext):
def __init__(
self,
config: AddDatasetPropertiesConfig,
ctx: PipelineContext,
**resolver_args: Dict[str, Any],
):
self.ctx = ctx
self.config = config
self.resolver_args = resolver_args

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetProperties":
Expand All @@ -47,15 +53,43 @@ def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventCla
if not isinstance(mce.proposedSnapshot, DatasetSnapshotClass):
return mce

properties_to_add = (
self.config.add_properties_resolver_class().get_properties_to_add(
mce.proposedSnapshot
)
)
properties_to_add = self.config.add_properties_resolver_class( # type: ignore
**self.resolver_args
).get_properties_to_add(mce.proposedSnapshot)
if properties_to_add:
properties = builder.get_or_add_aspect(
mce, DatasetPropertiesClass(customProperties={})
)
properties.customProperties.update(properties_to_add)

return mce


class SimpleAddDatasetPropertiesConfig(ConfigModel):
properties: Dict[str, str]


class SimpleAddDatasetPropertiesResolverClass(AddDatasetPropertiesResolverBase):
def __init__(self, properties: Dict[str, str]):
self.properties = properties

def get_properties_to_add(self, current: DatasetSnapshotClass) -> Dict[str, str]:
return self.properties


class SimpleAddDatasetProperties(AddDatasetProperties):
"""Transformer that adds a specified set of properties to each dataset."""

def __init__(self, config: SimpleAddDatasetPropertiesConfig, ctx: PipelineContext):
generic_config = AddDatasetPropertiesConfig(
add_properties_resolver_class=SimpleAddDatasetPropertiesResolverClass
)
resolver_args = {"properties": config.properties}
super().__init__(generic_config, ctx, **resolver_args)

@classmethod
def create(
cls, config_dict: dict, ctx: PipelineContext
) -> "SimpleAddDatasetProperties":
config = SimpleAddDatasetPropertiesConfig.parse_obj(config_dict)
return cls(config, ctx)
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@
PatternAddDatasetOwnership,
SimpleAddDatasetOwnership,
)
from datahub.ingestion.transformer.add_dataset_properties import AddDatasetProperties
from datahub.ingestion.transformer.add_dataset_properties import (
AddDatasetProperties,
SimpleAddDatasetProperties,
)
from datahub.ingestion.transformer.add_dataset_tags import (
AddDatasetTags,
PatternAddDatasetTags,
Expand Down Expand Up @@ -45,3 +48,4 @@
transform_registry.register("pattern_add_dataset_terms", PatternAddDatasetTerms)

transform_registry.register("add_dataset_properties", AddDatasetProperties)
transform_registry.register("simple_add_dataset_properties", SimpleAddDatasetProperties)
35 changes: 34 additions & 1 deletion metadata-ingestion/tests/unit/test_transform_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from datahub.ingestion.transformer.add_dataset_properties import (
AddDatasetProperties,
AddDatasetPropertiesResolverBase,
SimpleAddDatasetProperties,
)
from datahub.ingestion.transformer.add_dataset_tags import (
AddDatasetTags,
Expand Down Expand Up @@ -75,7 +76,9 @@ def make_dataset_with_properties() -> models.MetadataChangeEventClass:
urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)",
aspects=[
models.StatusClass(removed=False),
models.DatasetPropertiesClass(customProperties=EXISTING_PROPERTIES),
models.DatasetPropertiesClass(
customProperties=EXISTING_PROPERTIES.copy()
),
],
),
)
Expand Down Expand Up @@ -641,6 +644,36 @@ def test_add_dataset_properties(mock_time):
}


def test_simple_add_dataset_properties(mock_time):
dataset_mce = make_dataset_with_properties()

new_properties = {"new-simple-property": "new-value"}
transformer = SimpleAddDatasetProperties.create(
{
"properties": new_properties,
},
PipelineContext(run_id="test-simple-properties"),
)

outputs = list(
transformer.transform(
[RecordEnvelope(input, metadata={}) for input in [dataset_mce]]
)
)
assert len(outputs) == 1

custom_properties = builder.get_aspect_if_available(
outputs[0].record, models.DatasetPropertiesClass
)

print(str(custom_properties))
assert custom_properties is not None
assert custom_properties.customProperties == {
**EXISTING_PROPERTIES,
**new_properties,
}


def test_simple_dataset_terms_transformation(mock_time):
dataset_mce = make_generic_dataset()

Expand Down