Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest/stateful): omit irrelevant urns for deletion #11558

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

STATEFUL_INGESTION_IGNORED_ENTITY_TYPES = {
"dataProcessInstance",
"query",
}


Expand Down Expand Up @@ -75,7 +76,10 @@ def auto_stale_entity_removal(

if wu.is_primary_source:
entity_type = guess_entity_type(urn)
if entity_type is not None:
if (
entity_type is not None
and entity_type not in STATEFUL_INGESTION_IGNORED_ENTITY_TYPES
):
stale_entity_removal_handler.add_entity_to_state(entity_type, urn)
else:
stale_entity_removal_handler.add_urn_to_skip(urn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
"state": {
"formatVersion": "1.0",
"serde": "utf-8",
"payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)\", \"urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619\"]}"
"payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)\", \"urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619\", \"urn:li:query:query1\"]}"
},
"runId": "dummy-test-stateful-ingestion"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,16 @@
"json": {
"timestampMillis": 1586847600000,
"partitionSpec": {
"type": "FULL_TABLE",
"partition": "FULL_TABLE_SNAPSHOT"
"partition": "FULL_TABLE_SNAPSHOT",
"type": "FULL_TABLE"
},
"pipelineName": "dummy_stateful",
"platformInstanceId": "",
"config": "",
"state": {
"formatVersion": "1.0",
"serde": "utf-8",
"payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\", \"urn:li:dataProcessInstance:7f26c3b4d2d82ace47f4b9dd0c9dea26\"]}"
"payload": "{\"urns\": [\"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset1,PROD)\", \"urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset2,PROD)\"]}"
},
"runId": "dummy-test-stateful-ingestion"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,23 @@
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:query1",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:bigquery"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided",
"pipelineName": "dummy_stateful"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619",
Expand All @@ -84,5 +101,22 @@
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:query1",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided",
"pipelineName": "dummy_stateful"
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,46 @@
}
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided",
"pipelineName": "dummy_stateful"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": true
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
}
},
{
"entityType": "query",
"entityUrn": "urn:li:query:query2",
"changeType": "UPSERT",
"aspectName": "dataPlatformInstance",
"aspect": {
"json": {
"platform": "urn:li:dataPlatform:bigquery"
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided",
"pipelineName": "dummy_stateful"
}
},
{
"entityType": "dataProcessInstance",
"entityUrn": "urn:li:dataProcessInstance:7f26c3b4d2d82ace47f4b9dd0c9dea26",
Expand All @@ -66,23 +100,25 @@
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
"lastRunId": "no-run-id-provided",
"pipelineName": "dummy_stateful"
}
},
{
"entityType": "dataset",
"entityUrn": "urn:li:dataset:(urn:li:dataPlatform:postgres,dummy_dataset3,PROD)",
"entityType": "query",
"entityUrn": "urn:li:query:query2",
"changeType": "UPSERT",
"aspectName": "status",
"aspect": {
"json": {
"removed": true
"removed": false
}
},
"systemMetadata": {
"lastObserved": 1586847600000,
"runId": "dummy-test-stateful-ingestion",
"lastRunId": "no-run-id-provided"
"lastRunId": "no-run-id-provided",
"pipelineName": "dummy_stateful"
}
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@
from datahub.metadata.com.linkedin.pegasus2avro.dataprocess import (
DataProcessInstanceProperties,
)
from datahub.metadata.schema_classes import AuditStampClass, StatusClass
from datahub.metadata.schema_classes import (
AuditStampClass,
DataPlatformInstanceClass,
StatusClass,
)
from datahub.metadata.urns import DataPlatformUrn, QueryUrn
from datahub.utilities.urns.dataset_urn import DatasetUrn
from tests.test_helpers import mce_helpers
from tests.test_helpers.state_helpers import (
Expand Down Expand Up @@ -71,6 +76,9 @@ class DummySourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigMixin):
default=None,
description="Data process instance id to ingest.",
)
query_id_to_ingest: Optional[str] = Field(
default=None, description="Query id to ingest"
)


class DummySource(StatefulIngestionSourceBase):
Expand Down Expand Up @@ -136,6 +144,14 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
),
).as_workunit()

if self.source_config.query_id_to_ingest:
yield MetadataChangeProposalWrapper(
entityUrn=QueryUrn(self.source_config.query_id_to_ingest).urn(),
aspect=DataPlatformInstanceClass(
platform=DataPlatformUrn("bigquery").urn()
),
).as_workunit()

if self.source_config.report_failure:
self.reporter.report_failure("Dummy error", "Error")

Expand Down Expand Up @@ -188,6 +204,7 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time):
},
},
"dpi_id_to_ingest": "job1",
"query_id_to_ingest": "query1",
},
},
"sink": {
Expand All @@ -198,7 +215,11 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time):

with mock.patch(
"datahub.ingestion.source.state.stale_entity_removal_handler.StaleEntityRemovalHandler._get_state_obj"
) as mock_state:
) as mock_state, mock.patch(
"datahub.ingestion.source.state.stale_entity_removal_handler.STATEFUL_INGESTION_IGNORED_ENTITY_TYPES",
{},
# Second mock is to imitate earlier behavior where entity type check was not present when adding entity to state
):
mock_state.return_value = GenericCheckpointState(serde="utf-8")
pipeline_run1 = None
pipeline_run1_config: Dict[str, Dict[str, Dict[str, Any]]] = dict( # type: ignore
Expand Down Expand Up @@ -237,6 +258,8 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time):
"allow": ["dummy_dataset1", "dummy_dataset2"],
}
pipeline_run2_config["source"]["config"]["dpi_id_to_ingest"] = "job2"
pipeline_run2_config["source"]["config"]["query_id_to_ingest"] = "query2"

pipeline_run2_config["sink"]["config"][
"filename"
] = f"{tmp_path}/{output_file_name_after_deleted}"
Expand Down Expand Up @@ -288,6 +311,7 @@ def test_stateful_ingestion(pytestconfig, tmp_path, mock_time):
# assert report last ingestion state non_deletable entity urns
non_deletable_urns: List[str] = [
"urn:li:dataProcessInstance:478810e859f870a54f72c681f41af619",
"urn:li:query:query1",
]
assert sorted(non_deletable_urns) == sorted(
report.last_state_non_deletable_entities
Expand Down
Loading