From e2a44d9303e3f2fa415b3a23db3d2b532deea5ac Mon Sep 17 00:00:00 2001 From: Andrew Sikowitz Date: Tue, 21 Feb 2023 21:27:44 -0500 Subject: [PATCH] fix(ingest): Do not require platform_instance for stateful ingestion (#7397) --- .../src/datahub/ingestion/source/aws/glue.py | 2 +- .../ingestion/source/bigquery_v2/bigquery.py | 2 +- .../datahub/ingestion/source/dbt/dbt_cloud.py | 2 +- .../datahub/ingestion/source/dbt/dbt_core.py | 2 +- .../ingestion/source/iceberg/iceberg.py | 3 +- .../source/iceberg/iceberg_common.py | 13 --------- .../src/datahub/ingestion/source/kafka.py | 18 ++---------- .../src/datahub/ingestion/source/ldap.py | 2 +- .../ingestion/source/looker/looker_source.py | 2 +- .../ingestion/source/looker/lookml_source.py | 2 +- .../ingestion/source/powerbi/powerbi.py | 2 +- .../src/datahub/ingestion/source/pulsar.py | 3 +- .../source/snowflake/snowflake_v2.py | 2 +- .../ingestion/source/sql/sql_common.py | 2 +- .../source/state/stateful_ingestion_base.py | 2 +- .../src/datahub/ingestion/source/tableau.py | 2 +- .../datahub/ingestion/source/unity/source.py | 2 +- .../datahub/ingestion/source_config/pulsar.py | 15 +--------- .../tests/unit/test_kafka_source.py | 29 ------------------- 19 files changed, 18 insertions(+), 89 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py index e4004471e6e299..a0d34a18300ca0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py @@ -1252,5 +1252,5 @@ def get_data_platform_instance() -> DataPlatformInstanceClass: def get_report(self): return self.report - def get_platform_instance_id(self) -> str: + def get_platform_instance_id(self) -> Optional[str]: return self.source_config.platform_instance or self.platform diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index b7ada4746656a6..25aa3010262230 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -413,7 +413,7 @@ def get_dataplatform_instance_aspect( else: return None - def get_platform_instance_id(self) -> str: + def get_platform_instance_id(self) -> Optional[str]: """ The source identifier such as the specific source host address required for stateful ingestion. Individual subclasses need to override this method appropriately. diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py index 64eddeaaa60da6..4ff6021f2cfca2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_cloud.py @@ -416,7 +416,7 @@ def get_external_url(self, node: DBTNode) -> Optional[str]: # TODO: Once dbt Cloud supports deep linking to specific files, we can use that. return f"https://cloud.getdbt.com/next/accounts/{self.config.account_id}/projects/{self.config.project_id}/develop" - def get_platform_instance_id(self) -> str: + def get_platform_instance_id(self) -> Optional[str]: """The DBT project identifier is used as platform instance.""" return f"{self.platform}_{self.config.project_id}" diff --git a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py index d4492bbf1e9114..ce4088529f6081 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_core.py @@ -483,7 +483,7 @@ def get_external_url(self, node: DBTNode) -> Optional[str]: return self.config.git_info.get_url_for_file_path(node.dbt_file_path) return None - def get_platform_instance_id(self) -> str: + def get_platform_instance_id(self) -> Optional[str]: """The DBT project identifier is used as platform instance.""" project_id = ( diff --git a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py index cd66d0b1e57b7b..c4cfe79b942e87 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py +++ b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg.py @@ -318,8 +318,7 @@ def _get_avro_schema_from_data_type(self, column: NestedField) -> Dict[str, Any] ], } - def get_platform_instance_id(self) -> str: - assert self.config.platform_instance is not None + def get_platform_instance_id(self) -> Optional[str]: return self.config.platform_instance def get_report(self) -> SourceReport: diff --git a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py index 1c9c2d0436beef..046342cc03d9db 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/iceberg/iceberg_common.py @@ -81,19 +81,6 @@ class IcebergSourceConfig(StatefulIngestionConfigBase): ) profiling: IcebergProfilingConfig = IcebergProfilingConfig() - @pydantic.root_validator - def validate_platform_instance(cls: "IcebergSourceConfig", values: Dict) -> Dict: - stateful_ingestion = values.get("stateful_ingestion") - if ( - stateful_ingestion - and stateful_ingestion.enabled - and not values.get("platform_instance") - ): - raise ConfigurationError( - "Enabling Iceberg stateful ingestion requires to specify a platform instance." - ) - return values - @root_validator() def _ensure_one_filesystem_is_configured( cls: "IcebergSourceConfig", values: Dict diff --git a/metadata-ingestion/src/datahub/ingestion/source/kafka.py b/metadata-ingestion/src/datahub/ingestion/source/kafka.py index ee42c0ea4f6fd3..e1fa5b1d47d43b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/source/kafka.py @@ -15,7 +15,7 @@ TopicMetadata, ) -from datahub.configuration.common import AllowDenyPattern, ConfigurationError +from datahub.configuration.common import AllowDenyPattern from datahub.configuration.kafka import KafkaConsumerConnectionConfig from datahub.configuration.source_common import DatasetSourceConfigBase from datahub.emitter.mce_builder import ( @@ -97,19 +97,6 @@ class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigBase): description="Disables warnings reported for non-AVRO/Protobuf value or key schemas if set.", ) - @pydantic.root_validator - def validate_platform_instance(cls: "KafkaSourceConfig", values: Dict) -> Dict: - stateful_ingestion = values.get("stateful_ingestion") - if ( - stateful_ingestion - and stateful_ingestion.enabled - and not values.get("platform_instance") - ): - raise ConfigurationError( - "Enabling kafka stateful ingestion requires to specify a platform instance." - ) - return values - @dataclass class KafkaSourceReport(StaleEntityRemovalSourceReport): @@ -199,8 +186,7 @@ def init_kafka_admin_client(self) -> None: f"Failed to create Kafka Admin Client due to error {e}.", ) - def get_platform_instance_id(self) -> str: - assert self.source_config.platform_instance is not None + def get_platform_instance_id(self) -> Optional[str]: return self.source_config.platform_instance @classmethod diff --git a/metadata-ingestion/src/datahub/ingestion/source/ldap.py b/metadata-ingestion/src/datahub/ingestion/source/ldap.py index 82e5abc5eab188..518dcdd1fc9916 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ldap.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ldap.py @@ -287,7 +287,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: cookie = set_cookie(self.lc, pctrls) - def get_platform_instance_id(self) -> str: + def get_platform_instance_id(self) -> Optional[str]: """ The source identifier such as the specific source host address required for stateful ingestion. Individual subclasses need to override this method appropriately. diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py index 40901170a03017..bee8fd5d05a0ea 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/looker_source.py @@ -1353,7 +1353,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: def get_report(self) -> SourceReport: return self.reporter - def get_platform_instance_id(self) -> str: + def get_platform_instance_id(self) -> Optional[str]: return self.source_config.platform_instance or self.platform def close(self): diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py index edec408e822ab1..ff456efc0b7e4d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker/lookml_source.py @@ -1746,7 +1746,7 @@ def get_internal_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 def get_report(self): return self.reporter - def get_platform_instance_id(self) -> str: + def get_platform_instance_id(self) -> Optional[str]: return self.source_config.platform_instance or self.platform def close(self): diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py index 972c4654e234b7..3c6f9e7da66f12 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi/powerbi.py @@ -909,7 +909,7 @@ def __init__(self, config: PowerBiDashboardSourceConfig, ctx: PipelineContext): run_id=ctx.run_id, ) - def get_platform_instance_id(self) -> str: + def get_platform_instance_id(self) -> Optional[str]: return self.source_config.platform_name @classmethod diff --git a/metadata-ingestion/src/datahub/ingestion/source/pulsar.py b/metadata-ingestion/src/datahub/ingestion/source/pulsar.py index eccf0c55ba62a1..ab9c62ee4414d9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/pulsar.py +++ b/metadata-ingestion/src/datahub/ingestion/source/pulsar.py @@ -223,8 +223,7 @@ def _get_pulsar_metadata(self, url): f"An ambiguous exception occurred while handling the request: {e}" ) - def get_platform_instance_id(self) -> str: - assert self.config.platform_instance is not None + def get_platform_instance_id(self) -> Optional[str]: return self.config.platform_instance @classmethod diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index 23ce11cbe1908d..1962dceb9fd2d7 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -1399,7 +1399,7 @@ def inspect_session_metadata(self) -> None: self.report.edition = None # Stateful Ingestion Overrides. - def get_platform_instance_id(self) -> str: + def get_platform_instance_id(self) -> Optional[str]: return self.config.get_account() # Ideally we do not want null values in sample data for a column. diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py index 38415c11635a86..39b9f223594ebd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py @@ -394,7 +394,7 @@ def get_db_name(self, inspector: Inspector) -> str: def get_schema_names(self, inspector): return inspector.get_schema_names() - def get_platform_instance_id(self) -> str: + def get_platform_instance_id(self) -> Optional[str]: """ The source identifier such as the specific source host address required for stateful ingestion. Individual subclasses need to override this method appropriately. diff --git a/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py b/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py index ba2ee36e302078..03a3738aee3585 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py +++ b/metadata-ingestion/src/datahub/ingestion/source/state/stateful_ingestion_base.py @@ -281,7 +281,7 @@ def is_checkpointing_enabled(self, job_id: JobId) -> bool: raise ValueError(f"No use-case handler for job_id{job_id}") return self._usecase_handlers[job_id].is_checkpointing_enabled() - def get_platform_instance_id(self) -> str: + def get_platform_instance_id(self) -> Optional[str]: # This method is retained for backwards compatibility, but it is not # required that new sources implement it. We mainly need it for the # fallback logic in _get_last_checkpoint. diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py index 72853c91141647..75b9e6cfb2e835 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py @@ -1805,5 +1805,5 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: def get_report(self) -> StaleEntityRemovalSourceReport: return self.report - def get_platform_instance_id(self) -> str: + def get_platform_instance_id(self) -> Optional[str]: return self.config.platform_instance or self.platform diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index 521ce6c547acf1..c334da6d159a0b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -156,7 +156,7 @@ def create(cls, config_dict, ctx): config = UnityCatalogSourceConfig.parse_obj(config_dict) return cls(ctx=ctx, config=config) - def get_platform_instance_id(self) -> str: + def get_platform_instance_id(self) -> Optional[str]: return self.config.platform_instance or self.platform def get_workunits(self) -> Iterable[MetadataWorkUnit]: diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/pulsar.py b/metadata-ingestion/src/datahub/ingestion/source_config/pulsar.py index 249f918c0330ba..c1e2faed65df73 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_config/pulsar.py +++ b/metadata-ingestion/src/datahub/ingestion/source_config/pulsar.py @@ -2,7 +2,7 @@ from typing import Dict, List, Optional, Union from urllib.parse import urlparse -from pydantic import Field, root_validator, validator +from pydantic import Field, validator from datahub.configuration.common import AllowDenyPattern, ConfigurationError from datahub.configuration.source_common import DEFAULT_ENV, DatasetSourceConfigBase @@ -132,16 +132,3 @@ def web_service_url_scheme_host_port(cls, val: str) -> str: ) return config_clean.remove_trailing_slashes(val) - - @root_validator - def validate_platform_instance(cls: "PulsarSourceConfig", values: Dict) -> Dict: - stateful_ingestion = values.get("stateful_ingestion") - if ( - stateful_ingestion - and stateful_ingestion.enabled - and not values.get("platform_instance") - ): - raise ConfigurationError( - "Enabling Pulsar stateful ingestion requires to specify a platform instance." - ) - return values diff --git a/metadata-ingestion/tests/unit/test_kafka_source.py b/metadata-ingestion/tests/unit/test_kafka_source.py index 8ea277866241c4..88d9daf4442167 100644 --- a/metadata-ingestion/tests/unit/test_kafka_source.py +++ b/metadata-ingestion/tests/unit/test_kafka_source.py @@ -8,7 +8,6 @@ Schema, ) -from datahub.configuration.common import ConfigurationError from datahub.emitter.mce_builder import ( make_dataplatform_instance_urn, make_dataset_urn_with_platform_instance, @@ -165,34 +164,6 @@ def test_close(mock_kafka, mock_admin_client): assert mock_kafka_instance.close.call_count == 1 -def test_kafka_source_stateful_ingestion_requires_platform_instance(): - class StatefulProviderMock: - def __init__(self, config, ctx): - self.ctx = ctx - self.config = config - - def is_stateful_ingestion_configured(self): - return self.config.stateful_ingestion.enabled - - ctx = PipelineContext(run_id="test", pipeline_name="test") - with pytest.raises(ConfigurationError) as e: - KafkaSource.create( - { - "stateful_ingestion": { - "enabled": "true", - "fail_safe_threshold": 100.0, - }, - "connection": {"bootstrap": "localhost:9092"}, - }, - ctx, - ) - - assert ( - "Enabling kafka stateful ingestion requires to specify a platform instance" - in str(e) - ) - - @patch( "datahub.ingestion.source.kafka.confluent_kafka.schema_registry.schema_registry_client.SchemaRegistryClient", autospec=True,