Skip to content

Commit

Permalink
fix(ingest): Do not require platform_instance for stateful ingestion (d…
Browse files Browse the repository at this point in the history
  • Loading branch information
asikowitz authored and Oleg Ruban committed Feb 28, 2023
1 parent fd3942b commit e2a44d9
Show file tree
Hide file tree
Showing 19 changed files with 18 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1252,5 +1252,5 @@ def get_data_platform_instance() -> DataPlatformInstanceClass:
def get_report(self):
return self.report

def get_platform_instance_id(self) -> str:
def get_platform_instance_id(self) -> Optional[str]:
return self.source_config.platform_instance or self.platform
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ def get_dataplatform_instance_aspect(
else:
return None

def get_platform_instance_id(self) -> str:
def get_platform_instance_id(self) -> Optional[str]:
"""
The source identifier such as the specific source host address required for stateful ingestion.
Individual subclasses need to override this method appropriately.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ def get_external_url(self, node: DBTNode) -> Optional[str]:
# TODO: Once dbt Cloud supports deep linking to specific files, we can use that.
return f"https://cloud.getdbt.com/next/accounts/{self.config.account_id}/projects/{self.config.project_id}/develop"

def get_platform_instance_id(self) -> str:
def get_platform_instance_id(self) -> Optional[str]:
"""The DBT project identifier is used as platform instance."""

return f"{self.platform}_{self.config.project_id}"
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ def get_external_url(self, node: DBTNode) -> Optional[str]:
return self.config.git_info.get_url_for_file_path(node.dbt_file_path)
return None

def get_platform_instance_id(self) -> str:
def get_platform_instance_id(self) -> Optional[str]:
"""The DBT project identifier is used as platform instance."""

project_id = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,7 @@ def _get_avro_schema_from_data_type(self, column: NestedField) -> Dict[str, Any]
],
}

def get_platform_instance_id(self) -> str:
assert self.config.platform_instance is not None
def get_platform_instance_id(self) -> Optional[str]:
return self.config.platform_instance

def get_report(self) -> SourceReport:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,6 @@ class IcebergSourceConfig(StatefulIngestionConfigBase):
)
profiling: IcebergProfilingConfig = IcebergProfilingConfig()

@pydantic.root_validator
def validate_platform_instance(cls: "IcebergSourceConfig", values: Dict) -> Dict:
stateful_ingestion = values.get("stateful_ingestion")
if (
stateful_ingestion
and stateful_ingestion.enabled
and not values.get("platform_instance")
):
raise ConfigurationError(
"Enabling Iceberg stateful ingestion requires to specify a platform instance."
)
return values

@root_validator()
def _ensure_one_filesystem_is_configured(
cls: "IcebergSourceConfig", values: Dict
Expand Down
18 changes: 2 additions & 16 deletions metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
TopicMetadata,
)

from datahub.configuration.common import AllowDenyPattern, ConfigurationError
from datahub.configuration.common import AllowDenyPattern
from datahub.configuration.kafka import KafkaConsumerConnectionConfig
from datahub.configuration.source_common import DatasetSourceConfigBase
from datahub.emitter.mce_builder import (
Expand Down Expand Up @@ -97,19 +97,6 @@ class KafkaSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigBase):
description="Disables warnings reported for non-AVRO/Protobuf value or key schemas if set.",
)

@pydantic.root_validator
def validate_platform_instance(cls: "KafkaSourceConfig", values: Dict) -> Dict:
stateful_ingestion = values.get("stateful_ingestion")
if (
stateful_ingestion
and stateful_ingestion.enabled
and not values.get("platform_instance")
):
raise ConfigurationError(
"Enabling kafka stateful ingestion requires to specify a platform instance."
)
return values


@dataclass
class KafkaSourceReport(StaleEntityRemovalSourceReport):
Expand Down Expand Up @@ -199,8 +186,7 @@ def init_kafka_admin_client(self) -> None:
f"Failed to create Kafka Admin Client due to error {e}.",
)

def get_platform_instance_id(self) -> str:
assert self.source_config.platform_instance is not None
def get_platform_instance_id(self) -> Optional[str]:
return self.source_config.platform_instance

@classmethod
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/ingestion/source/ldap.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:

cookie = set_cookie(self.lc, pctrls)

def get_platform_instance_id(self) -> str:
def get_platform_instance_id(self) -> Optional[str]:
"""
The source identifier such as the specific source host address required for stateful ingestion.
Individual subclasses need to override this method appropriately.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1353,7 +1353,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
def get_report(self) -> SourceReport:
return self.reporter

def get_platform_instance_id(self) -> str:
def get_platform_instance_id(self) -> Optional[str]:
return self.source_config.platform_instance or self.platform

def close(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1746,7 +1746,7 @@ def get_internal_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901
def get_report(self):
return self.reporter

def get_platform_instance_id(self) -> str:
def get_platform_instance_id(self) -> Optional[str]:
return self.source_config.platform_instance or self.platform

def close(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ def __init__(self, config: PowerBiDashboardSourceConfig, ctx: PipelineContext):
run_id=ctx.run_id,
)

def get_platform_instance_id(self) -> str:
def get_platform_instance_id(self) -> Optional[str]:
return self.source_config.platform_name

@classmethod
Expand Down
3 changes: 1 addition & 2 deletions metadata-ingestion/src/datahub/ingestion/source/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,7 @@ def _get_pulsar_metadata(self, url):
f"An ambiguous exception occurred while handling the request: {e}"
)

def get_platform_instance_id(self) -> str:
assert self.config.platform_instance is not None
def get_platform_instance_id(self) -> Optional[str]:
return self.config.platform_instance

@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1399,7 +1399,7 @@ def inspect_session_metadata(self) -> None:
self.report.edition = None

# Stateful Ingestion Overrides.
def get_platform_instance_id(self) -> str:
def get_platform_instance_id(self) -> Optional[str]:
return self.config.get_account()

# Ideally we do not want null values in sample data for a column.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ def get_db_name(self, inspector: Inspector) -> str:
def get_schema_names(self, inspector):
return inspector.get_schema_names()

def get_platform_instance_id(self) -> str:
def get_platform_instance_id(self) -> Optional[str]:
"""
The source identifier such as the specific source host address required for stateful ingestion.
Individual subclasses need to override this method appropriately.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ def is_checkpointing_enabled(self, job_id: JobId) -> bool:
raise ValueError(f"No use-case handler for job_id{job_id}")
return self._usecase_handlers[job_id].is_checkpointing_enabled()

def get_platform_instance_id(self) -> str:
def get_platform_instance_id(self) -> Optional[str]:
# This method is retained for backwards compatibility, but it is not
# required that new sources implement it. We mainly need it for the
# fallback logic in _get_last_checkpoint.
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -1805,5 +1805,5 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
def get_report(self) -> StaleEntityRemovalSourceReport:
return self.report

def get_platform_instance_id(self) -> str:
def get_platform_instance_id(self) -> Optional[str]:
return self.config.platform_instance or self.platform
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ def create(cls, config_dict, ctx):
config = UnityCatalogSourceConfig.parse_obj(config_dict)
return cls(ctx=ctx, config=config)

def get_platform_instance_id(self) -> str:
def get_platform_instance_id(self) -> Optional[str]:
return self.config.platform_instance or self.platform

def get_workunits(self) -> Iterable[MetadataWorkUnit]:
Expand Down
15 changes: 1 addition & 14 deletions metadata-ingestion/src/datahub/ingestion/source_config/pulsar.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Dict, List, Optional, Union
from urllib.parse import urlparse

from pydantic import Field, root_validator, validator
from pydantic import Field, validator

from datahub.configuration.common import AllowDenyPattern, ConfigurationError
from datahub.configuration.source_common import DEFAULT_ENV, DatasetSourceConfigBase
Expand Down Expand Up @@ -132,16 +132,3 @@ def web_service_url_scheme_host_port(cls, val: str) -> str:
)

return config_clean.remove_trailing_slashes(val)

@root_validator
def validate_platform_instance(cls: "PulsarSourceConfig", values: Dict) -> Dict:
stateful_ingestion = values.get("stateful_ingestion")
if (
stateful_ingestion
and stateful_ingestion.enabled
and not values.get("platform_instance")
):
raise ConfigurationError(
"Enabling Pulsar stateful ingestion requires to specify a platform instance."
)
return values
29 changes: 0 additions & 29 deletions metadata-ingestion/tests/unit/test_kafka_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
Schema,
)

from datahub.configuration.common import ConfigurationError
from datahub.emitter.mce_builder import (
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
Expand Down Expand Up @@ -165,34 +164,6 @@ def test_close(mock_kafka, mock_admin_client):
assert mock_kafka_instance.close.call_count == 1


def test_kafka_source_stateful_ingestion_requires_platform_instance():
class StatefulProviderMock:
def __init__(self, config, ctx):
self.ctx = ctx
self.config = config

def is_stateful_ingestion_configured(self):
return self.config.stateful_ingestion.enabled

ctx = PipelineContext(run_id="test", pipeline_name="test")
with pytest.raises(ConfigurationError) as e:
KafkaSource.create(
{
"stateful_ingestion": {
"enabled": "true",
"fail_safe_threshold": 100.0,
},
"connection": {"bootstrap": "localhost:9092"},
},
ctx,
)

assert (
"Enabling kafka stateful ingestion requires to specify a platform instance"
in str(e)
)


@patch(
"datahub.ingestion.source.kafka.confluent_kafka.schema_registry.schema_registry_client.SchemaRegistryClient",
autospec=True,
Expand Down

0 comments on commit e2a44d9

Please sign in to comment.