diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_properties.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_properties.py index bd5f8906b7f96..6e1a0cc4ddc93 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_properties.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_properties.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Dict, Type +from typing import Any, Dict, Type import datahub.emitter.mce_builder as builder from datahub.configuration.common import ConfigModel @@ -34,9 +34,15 @@ class AddDatasetProperties(DatasetTransformer): ctx: PipelineContext config: AddDatasetPropertiesConfig - def __init__(self, config: AddDatasetPropertiesConfig, ctx: PipelineContext): + def __init__( + self, + config: AddDatasetPropertiesConfig, + ctx: PipelineContext, + **resolver_args: Dict[str, Any], + ): self.ctx = ctx self.config = config + self.resolver_args = resolver_args @classmethod def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetProperties": @@ -47,11 +53,9 @@ def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventCla if not isinstance(mce.proposedSnapshot, DatasetSnapshotClass): return mce - properties_to_add = ( - self.config.add_properties_resolver_class().get_properties_to_add( - mce.proposedSnapshot - ) - ) + properties_to_add = self.config.add_properties_resolver_class( # type: ignore + **self.resolver_args + ).get_properties_to_add(mce.proposedSnapshot) if properties_to_add: properties = builder.get_or_add_aspect( mce, DatasetPropertiesClass(customProperties={}) @@ -59,3 +63,33 @@ def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventCla properties.customProperties.update(properties_to_add) return mce + + +class SimpleAddDatasetPropertiesConfig(ConfigModel): + properties: Dict[str, str] + + +class SimpleAddDatasetPropertiesResolverClass(AddDatasetPropertiesResolverBase): + def __init__(self, properties: Dict[str, str]): + self.properties = properties + + def get_properties_to_add(self, current: DatasetSnapshotClass) -> Dict[str, str]: + return self.properties + + +class SimpleAddDatasetProperties(AddDatasetProperties): + """Transformer that adds a specified set of properties to each dataset.""" + + def __init__(self, config: SimpleAddDatasetPropertiesConfig, ctx: PipelineContext): + generic_config = AddDatasetPropertiesConfig( + add_properties_resolver_class=SimpleAddDatasetPropertiesResolverClass + ) + resolver_args = {"properties": config.properties} + super().__init__(generic_config, ctx, **resolver_args) + + @classmethod + def create( + cls, config_dict: dict, ctx: PipelineContext + ) -> "SimpleAddDatasetProperties": + config = SimpleAddDatasetPropertiesConfig.parse_obj(config_dict) + return cls(config, ctx) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/transform_registry.py b/metadata-ingestion/src/datahub/ingestion/transformer/transform_registry.py index 4dd10c120d3ee..ac8b0f7222f5c 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/transform_registry.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/transform_registry.py @@ -8,7 +8,10 @@ PatternAddDatasetOwnership, SimpleAddDatasetOwnership, ) -from datahub.ingestion.transformer.add_dataset_properties import AddDatasetProperties +from datahub.ingestion.transformer.add_dataset_properties import ( + AddDatasetProperties, + SimpleAddDatasetProperties, +) from datahub.ingestion.transformer.add_dataset_tags import ( AddDatasetTags, PatternAddDatasetTags, @@ -45,3 +48,4 @@ transform_registry.register("pattern_add_dataset_terms", PatternAddDatasetTerms) transform_registry.register("add_dataset_properties", AddDatasetProperties) +transform_registry.register("simple_add_dataset_properties", SimpleAddDatasetProperties) diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 8fa53250a9826..dc462cbb882a9 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -17,6 +17,7 @@ from datahub.ingestion.transformer.add_dataset_properties import ( AddDatasetProperties, AddDatasetPropertiesResolverBase, + SimpleAddDatasetProperties, ) from datahub.ingestion.transformer.add_dataset_tags import ( AddDatasetTags, @@ -75,7 +76,9 @@ def make_dataset_with_properties() -> models.MetadataChangeEventClass: urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)", aspects=[ models.StatusClass(removed=False), - models.DatasetPropertiesClass(customProperties=EXISTING_PROPERTIES), + models.DatasetPropertiesClass( + customProperties=EXISTING_PROPERTIES.copy() + ), ], ), ) @@ -641,6 +644,36 @@ def test_add_dataset_properties(mock_time): } +def test_simple_add_dataset_properties(mock_time): + dataset_mce = make_dataset_with_properties() + + new_properties = {"new-simple-property": "new-value"} + transformer = SimpleAddDatasetProperties.create( + { + "properties": new_properties, + }, + PipelineContext(run_id="test-simple-properties"), + ) + + outputs = list( + transformer.transform( + [RecordEnvelope(input, metadata={}) for input in [dataset_mce]] + ) + ) + assert len(outputs) == 1 + + custom_properties = builder.get_aspect_if_available( + outputs[0].record, models.DatasetPropertiesClass + ) + + print(str(custom_properties)) + assert custom_properties is not None + assert custom_properties.customProperties == { + **EXISTING_PROPERTIES, + **new_properties, + } + + def test_simple_dataset_terms_transformation(mock_time): dataset_mce = make_generic_dataset()