From 473774e471691d005a2573eb8623bb6f6e5c46d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Go=CC=81mez?= Date: Tue, 21 Dec 2021 13:50:21 +0100 Subject: [PATCH 1/3] feat(transform): adds simple add dataset properties transform --- .../transformer/add_dataset_properties.py | 35 +++++++++++++++++-- .../transformer/transform_registry.py | 6 +++- .../tests/unit/test_transform_dataset.py | 35 ++++++++++++++++++- 3 files changed, 72 insertions(+), 4 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_properties.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_properties.py index bd5f8906b7f96..133032563d1b0 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_properties.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_properties.py @@ -34,9 +34,10 @@ class AddDatasetProperties(DatasetTransformer): ctx: PipelineContext config: AddDatasetPropertiesConfig - def __init__(self, config: AddDatasetPropertiesConfig, ctx: PipelineContext): + def __init__(self, config: AddDatasetPropertiesConfig, ctx: PipelineContext, **resolver_args): self.ctx = ctx self.config = config + self.resolver_args = resolver_args @classmethod def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetProperties": @@ -48,7 +49,7 @@ def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventCla return mce properties_to_add = ( - self.config.add_properties_resolver_class().get_properties_to_add( + self.config.add_properties_resolver_class(**self.resolver_args).get_properties_to_add( mce.proposedSnapshot ) ) @@ -59,3 +60,33 @@ def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventCla properties.customProperties.update(properties_to_add) return mce + + +class SimpleAddDatasetPropertiesConfig(ConfigModel): + properties: Dict[str, str] + + +class SimpleAddDatasetPropertiesResolverClass(AddDatasetPropertiesResolverBase): + def __init__(self, properties: Dict[str, str]): + self.properties = properties + + def get_properties_to_add(self, current: DatasetSnapshotClass) -> Dict[str, str]: + return self.properties + + +class SimpleAddDatasetProperties(AddDatasetProperties): + """Transformer that adds a specified set of properties to each dataset.""" + + def __init__(self, config: SimpleAddDatasetPropertiesConfig, ctx: PipelineContext): + generic_config = AddDatasetPropertiesConfig( + add_properties_resolver_class=SimpleAddDatasetPropertiesResolverClass + ) + resolver_args = { + "properties": config.properties + } + super().__init__(generic_config, ctx, **resolver_args) + + @classmethod + def create(cls, config_dict: dict, ctx: PipelineContext) -> "SimpleAddDatasetProperties": + config = SimpleAddDatasetPropertiesConfig.parse_obj(config_dict) + return cls(config, ctx) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/transform_registry.py b/metadata-ingestion/src/datahub/ingestion/transformer/transform_registry.py index 4dd10c120d3ee..ac8b0f7222f5c 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/transform_registry.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/transform_registry.py @@ -8,7 +8,10 @@ PatternAddDatasetOwnership, SimpleAddDatasetOwnership, ) -from datahub.ingestion.transformer.add_dataset_properties import AddDatasetProperties +from datahub.ingestion.transformer.add_dataset_properties import ( + AddDatasetProperties, + SimpleAddDatasetProperties, +) from datahub.ingestion.transformer.add_dataset_tags import ( AddDatasetTags, PatternAddDatasetTags, @@ -45,3 +48,4 @@ transform_registry.register("pattern_add_dataset_terms", PatternAddDatasetTerms) transform_registry.register("add_dataset_properties", AddDatasetProperties) +transform_registry.register("simple_add_dataset_properties", SimpleAddDatasetProperties) diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 8fa53250a9826..5b8d750acb92a 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -17,6 +17,7 @@ from datahub.ingestion.transformer.add_dataset_properties import ( AddDatasetProperties, AddDatasetPropertiesResolverBase, + SimpleAddDatasetProperties, ) from datahub.ingestion.transformer.add_dataset_tags import ( AddDatasetTags, @@ -75,7 +76,7 @@ def make_dataset_with_properties() -> models.MetadataChangeEventClass: urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)", aspects=[ models.StatusClass(removed=False), - models.DatasetPropertiesClass(customProperties=EXISTING_PROPERTIES), + models.DatasetPropertiesClass(customProperties=EXISTING_PROPERTIES.copy()), ], ), ) @@ -641,6 +642,38 @@ def test_add_dataset_properties(mock_time): } +def test_simple_add_dataset_properties(mock_time): + dataset_mce = make_dataset_with_properties() + + new_properties = { + "new-simple-property": "new-value" + } + transformer = SimpleAddDatasetProperties.create( + { + "properties": new_properties, + }, + PipelineContext(run_id="test-simple-properties"), + ) + + outputs = list( + transformer.transform( + [RecordEnvelope(input, metadata={}) for input in [dataset_mce]] + ) + ) + assert len(outputs) == 1 + + custom_properties = builder.get_aspect_if_available( + outputs[0].record, models.DatasetPropertiesClass + ) + + print(str(custom_properties)) + assert custom_properties is not None + assert custom_properties.customProperties == { + **EXISTING_PROPERTIES, + **new_properties, + } + + def test_simple_dataset_terms_transformation(mock_time): dataset_mce = make_generic_dataset() From 84fd61175b12e7029872328e066e6808599f913d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Go=CC=81mez?= Date: Tue, 21 Dec 2021 13:57:40 +0100 Subject: [PATCH 2/3] fix: fixes linter issues --- .../transformer/add_dataset_properties.py | 20 +++++++++---------- .../tests/unit/test_transform_dataset.py | 8 ++++---- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_properties.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_properties.py index 133032563d1b0..d162c949b5e9e 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_properties.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_properties.py @@ -34,7 +34,9 @@ class AddDatasetProperties(DatasetTransformer): ctx: PipelineContext config: AddDatasetPropertiesConfig - def __init__(self, config: AddDatasetPropertiesConfig, ctx: PipelineContext, **resolver_args): + def __init__( + self, config: AddDatasetPropertiesConfig, ctx: PipelineContext, **resolver_args + ): self.ctx = ctx self.config = config self.resolver_args = resolver_args @@ -48,11 +50,9 @@ def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventCla if not isinstance(mce.proposedSnapshot, DatasetSnapshotClass): return mce - properties_to_add = ( - self.config.add_properties_resolver_class(**self.resolver_args).get_properties_to_add( - mce.proposedSnapshot - ) - ) + properties_to_add = self.config.add_properties_resolver_class( + **self.resolver_args + ).get_properties_to_add(mce.proposedSnapshot) if properties_to_add: properties = builder.get_or_add_aspect( mce, DatasetPropertiesClass(customProperties={}) @@ -81,12 +81,12 @@ def __init__(self, config: SimpleAddDatasetPropertiesConfig, ctx: PipelineContex generic_config = AddDatasetPropertiesConfig( add_properties_resolver_class=SimpleAddDatasetPropertiesResolverClass ) - resolver_args = { - "properties": config.properties - } + resolver_args = {"properties": config.properties} super().__init__(generic_config, ctx, **resolver_args) @classmethod - def create(cls, config_dict: dict, ctx: PipelineContext) -> "SimpleAddDatasetProperties": + def create( + cls, config_dict: dict, ctx: PipelineContext + ) -> "SimpleAddDatasetProperties": config = SimpleAddDatasetPropertiesConfig.parse_obj(config_dict) return cls(config, ctx) diff --git a/metadata-ingestion/tests/unit/test_transform_dataset.py b/metadata-ingestion/tests/unit/test_transform_dataset.py index 5b8d750acb92a..dc462cbb882a9 100644 --- a/metadata-ingestion/tests/unit/test_transform_dataset.py +++ b/metadata-ingestion/tests/unit/test_transform_dataset.py @@ -76,7 +76,9 @@ def make_dataset_with_properties() -> models.MetadataChangeEventClass: urn="urn:li:dataset:(urn:li:dataPlatform:bigquery,example1,PROD)", aspects=[ models.StatusClass(removed=False), - models.DatasetPropertiesClass(customProperties=EXISTING_PROPERTIES.copy()), + models.DatasetPropertiesClass( + customProperties=EXISTING_PROPERTIES.copy() + ), ], ), ) @@ -645,9 +647,7 @@ def test_add_dataset_properties(mock_time): def test_simple_add_dataset_properties(mock_time): dataset_mce = make_dataset_with_properties() - new_properties = { - "new-simple-property": "new-value" - } + new_properties = {"new-simple-property": "new-value"} transformer = SimpleAddDatasetProperties.create( { "properties": new_properties, From 37085afca42155ca7d12d2cf2a2d9c2ac7966096 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Go=CC=81mez?= Date: Tue, 21 Dec 2021 15:08:17 +0100 Subject: [PATCH 3/3] fix: mypy issues --- .../ingestion/transformer/add_dataset_properties.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_properties.py b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_properties.py index d162c949b5e9e..6e1a0cc4ddc93 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_properties.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/add_dataset_properties.py @@ -1,5 +1,5 @@ from abc import ABC, abstractmethod -from typing import Dict, Type +from typing import Any, Dict, Type import datahub.emitter.mce_builder as builder from datahub.configuration.common import ConfigModel @@ -35,7 +35,10 @@ class AddDatasetProperties(DatasetTransformer): config: AddDatasetPropertiesConfig def __init__( - self, config: AddDatasetPropertiesConfig, ctx: PipelineContext, **resolver_args + self, + config: AddDatasetPropertiesConfig, + ctx: PipelineContext, + **resolver_args: Dict[str, Any], ): self.ctx = ctx self.config = config @@ -50,7 +53,7 @@ def transform_one(self, mce: MetadataChangeEventClass) -> MetadataChangeEventCla if not isinstance(mce.proposedSnapshot, DatasetSnapshotClass): return mce - properties_to_add = self.config.add_properties_resolver_class( + properties_to_add = self.config.add_properties_resolver_class( # type: ignore **self.resolver_args ).get_properties_to_add(mce.proposedSnapshot) if properties_to_add: