Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stateful ingestion to create cadet databases #396

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ingestion/create_cadet_databases.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
pipeline_name: create-cadet-databases
source:
type: "ingestion.create_cadet_databases_source.source.CreateCadetDatabases"
config:
manifest_s3_uri: "s3://mojap-derived-tables/prod/run_artefacts/latest/target/manifest.json"
database_metadata_s3_uri: "s3://mojap-derived-tables/prod/run_artefacts/latest/target/database_metadata.json"
stateful_ingestion:
enabled: true
remove_stale_metadata: true
18 changes: 16 additions & 2 deletions ingestion/create_cadet_databases_source/config.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
from datahub.configuration.common import ConfigModel
from typing import Optional

from datahub.ingestion.source.state.stale_entity_removal_handler import (
StatefulStaleMetadataRemovalConfig,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
)
from pydantic import Field


class CreateCadetDatabasesConfig(ConfigModel):
class CreateCadetDatabasesConfig(StatefulIngestionConfigBase):
manifest_s3_uri: str = Field(
description="s3 path to dbt manifest json", default=None
)
database_metadata_s3_uri: str = Field(
description="s3 path to database_metadata json", default=None
)
stateful_ingestion: Optional[StatefulStaleMetadataRemovalConfig] = Field(
description="""
Can configure whether the ingestion is be be staeful and can remove stale metadata.
see https://datahubproject.io/docs/metadata-ingestion/docs/dev_guides/stateful/#stale-entity-removal
""",
default=None,
)
30 changes: 24 additions & 6 deletions ingestion/create_cadet_databases_source/source.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
import logging
from datetime import datetime
from typing import Iterable
from typing import Iterable, List, Optional

import datahub.emitter.mce_builder as mce_builder
import datahub.emitter.mcp_builder as mcp_builder
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import config_class
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.source import SourceReport, MetadataWorkUnitProcessor
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalSourceReport,
)
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
)
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionSourceBase,
)
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.common.subtypes import DatasetContainerSubTypes

from datahub.metadata.schema_classes import (
ChangeTypeClass,
ContainerClass,
DomainsClass,
GlobalTagsClass,
TagAssociationClass,
Expand All @@ -38,20 +49,27 @@


@config_class(CreateCadetDatabasesConfig)
class CreateCadetDatabases(Source):
source_config: CreateCadetDatabasesConfig
report: SourceReport = SourceReport()
class CreateCadetDatabases(StatefulIngestionSourceBase):

@report_time
def __init__(self, config: CreateCadetDatabasesConfig, ctx: PipelineContext):
super().__init__(ctx)
super().__init__(config, ctx)
self.source_config = config
self.report = StaleEntityRemovalSourceReport()

@classmethod
def create(cls, config_dict, ctx):
config = CreateCadetDatabasesConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
return [
*super().get_workunit_processors(),
StaleEntityRemovalHandler.create(
self, self.config, self.ctx
).workunit_processor,
]

@report_generator_time
def get_workunits(self) -> Iterable[MetadataWorkUnit]:
manifest = get_cadet_metadata_json(self.source_config.manifest_s3_uri)
Expand Down
Loading