Skip to content
This repository has been archived by the owner on Jan 27, 2025. It is now read-only.

Commit

Permalink
feat(ingest): enable container stateful ingestion (datahub-project#6343)
Browse files Browse the repository at this point in the history
  • Loading branch information
wangsaisai authored and cccs-Dustin committed Feb 1, 2023
1 parent b33d0b8 commit f3c363b
Showing 1 changed file with 13 additions and 0 deletions.
13 changes: 13 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

from datahub.configuration.common import AllowDenyPattern
from datahub.emitter.mce_builder import (
make_container_urn,
make_data_platform_urn,
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
Expand Down Expand Up @@ -589,6 +590,12 @@ def gen_database_containers(self, database: str) -> Iterable[MetadataWorkUnit]:
domain_urn=domain_urn,
)

# Add container to the checkpoint state
container_urn = make_container_urn(database_container_key.guid())
self.stale_entity_removal_handler.add_entity_to_state(
"container", container_urn
)

for wu in container_workunits:
self.report.report_workunit(wu)
yield wu
Expand All @@ -610,6 +617,12 @@ def gen_schema_containers(
database_container_key,
)

# Add container to the checkpoint state
container_urn = make_container_urn(schema_container_key.guid())
self.stale_entity_removal_handler.add_entity_to_state(
"container", container_urn
)

for wu in container_workunits:
self.report.report_workunit(wu)
yield wu
Expand Down

0 comments on commit f3c363b

Please sign in to comment.