Skip to content

Commit

Permalink
feat(ingest): simplify more stateful ingestion state (datahub-project…
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored and szalai1 committed Dec 22, 2022
1 parent 6f6323a commit c542b61
Show file tree
Hide file tree
Showing 14 changed files with 111 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
IcebergSourceReport,
)
from datahub.ingestion.source.iceberg.iceberg_profiler import IcebergProfiler
from datahub.ingestion.source.state.iceberg_state import IcebergCheckpointState
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
)
Expand Down Expand Up @@ -123,7 +123,7 @@ def __init__(self, config: IcebergSourceConfig, ctx: PipelineContext) -> None:
self.stale_entity_removal_handler = StaleEntityRemovalHandler(
source=self,
config=self.config,
state_type_class=IcebergCheckpointState,
state_type_class=GenericCheckpointState,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,3 @@ class DbtCheckpointState(GenericCheckpointState):
"encoded_assertion_urns": "assertion",
}
)

def prepare_for_commit(self) -> None:
self.urns = list(set(self.urns))
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Iterable, List, Type
from typing import Any, Dict, Iterable, List, Type

import pydantic

Expand All @@ -7,19 +7,32 @@
StaleEntityCheckpointStateBase,
)
from datahub.utilities.checkpoint_state_util import CheckpointStateUtil
from datahub.utilities.dedup_list import deduplicate_list
from datahub.utilities.urns.urn import guess_entity_type


class GenericCheckpointState(StaleEntityCheckpointStateBase["GenericCheckpointState"]):
urns: List[str] = pydantic.Field(default_factory=list)

# We store a bit of extra internal-only state so that we can keep the urns list deduplicated.
# However, we still want `urns` to be a list so that it maintains its order.
# We can't used OrderedSet here because pydantic doesn't recognize it and
# it isn't JSON serializable.
_urns_set: set = pydantic.PrivateAttr(default_factory=set)

def __init__(self, **data: Any): # type: ignore
super().__init__(**data)
self.urns = deduplicate_list(self.urns)
self._urns_set = set(self.urns)

@classmethod
def get_supported_types(cls) -> List[str]:
return ["*"]

def add_checkpoint_urn(self, type: str, urn: str) -> None:
# TODO: dedup
self.urns.append(urn)
if urn not in self._urns_set:
self.urns.append(urn)
self._urns_set.add(urn)

def get_urns_not_in(
self, type: str, other_checkpoint_state: "GenericCheckpointState"
Expand All @@ -29,6 +42,8 @@ def get_urns_not_in(
# To maintain backwards compatibility, we provide this filtering mechanism.
if type == "*":
yield from diff
elif type == "topic":
yield from (urn for urn in diff if guess_entity_type(urn) == "dataset")
else:
yield from (urn for urn in diff if guess_entity_type(urn) == type)

Expand All @@ -51,6 +66,7 @@ def pydantic_state_migrator(mapping: Dict[str, str]) -> classmethod:
"dataset",
"container",
"assertion",
"topic",
]
assert set(mapping.values()) <= set(SUPPORTED_TYPES)

Expand All @@ -64,6 +80,11 @@ def _validate_field_rename(cls: Type, values: dict) -> dict:
value = values.pop(old_field)
if mapped_type == "dataset":
values["urns"] += CheckpointStateUtil.get_dataset_urns_not_in(value, [])
elif mapped_type == "topic":
values["urns"] += [
CheckpointStateUtil.get_urn_from_encoded_topic(encoded_urn)
for encoded_urn in value
]
elif mapped_type == "container":
values["urns"] += [make_container_urn(guid) for guid in value]
elif mapped_type == "assertion":
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,66 +1,17 @@
from typing import Iterable, List

import pydantic

from datahub.emitter.mce_builder import dataset_urn_to_key, make_dataset_urn
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityCheckpointStateBase,
from datahub.ingestion.source.state.entity_removal_state import (
GenericCheckpointState,
pydantic_state_migrator,
)


class KafkaCheckpointState(StaleEntityCheckpointStateBase["KafkaCheckpointState"]):
class KafkaCheckpointState(GenericCheckpointState):
"""
This Class represents the checkpoint state for Kafka based sources.
This class represents the checkpoint state for Kafka based sources.
Stores all the topics being ingested and it is used to remove any stale entities.
"""

encoded_topic_urns: List[str] = pydantic.Field(default_factory=list)

@classmethod
def get_supported_types(cls) -> List[str]:
return ["topic"]

@staticmethod
def _get_separator() -> str:
# Unique small string not allowed in URNs.
return "||"

@staticmethod
def _get_lightweight_repr(dataset_urn: str) -> str:
"""Reduces the amount of text in the URNs for smaller state footprint."""
SEP = KafkaCheckpointState._get_separator()
key = dataset_urn_to_key(dataset_urn)
assert key is not None
return f"{key.platform}{SEP}{key.name}{SEP}{key.origin}"

@staticmethod
def _get_urns_not_in(
encoded_urns_1: List[str], encoded_urns_2: List[str]
) -> Iterable[str]:
difference = set(encoded_urns_1) - set(encoded_urns_2)
for encoded_urn in difference:
platform, name, env = encoded_urn.split(
KafkaCheckpointState._get_separator()
)
yield make_dataset_urn(platform, name, env)

def add_checkpoint_urn(self, type: str, urn: str) -> None:
assert type in self.get_supported_types()
if type == "topic":
self.encoded_topic_urns.append(self._get_lightweight_repr(urn))

def get_urns_not_in(
self, type: str, other_checkpoint_state: "KafkaCheckpointState"
) -> Iterable[str]:
assert type in self.get_supported_types()
if type == "topic":
yield from self._get_urns_not_in(
self.encoded_topic_urns, other_checkpoint_state.encoded_topic_urns
)

def get_percent_entities_changed(
self, old_checkpoint_state: "KafkaCheckpointState"
) -> float:
return StaleEntityCheckpointStateBase.compute_percent_entities_changed(
[(self.encoded_topic_urns, old_checkpoint_state.encoded_topic_urns)]
)
_migration = pydantic_state_migrator(
{
"encoded_topic_urns": "topic",
}
)
Original file line number Diff line number Diff line change
@@ -1,39 +1,8 @@
from typing import Iterable, List
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState

import pydantic

from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityCheckpointStateBase,
)
from datahub.utilities.urns.urn import guess_entity_type


class LdapCheckpointState(StaleEntityCheckpointStateBase["LdapCheckpointState"]):
class LdapCheckpointState(GenericCheckpointState):
"""
Base class for representing the checkpoint state for all LDAP based sources.
Stores all corpuser and corpGroup and being ingested and is used to remove any stale entities.
"""

urns: List[str] = pydantic.Field(default_factory=list)

@classmethod
def get_supported_types(cls) -> List[str]:
return ["corpuser", "corpGroup"]

def add_checkpoint_urn(self, type: str, urn: str) -> None:
assert type in self.get_supported_types()
self.urns.append(urn)

def get_urns_not_in(
self, type: str, other_checkpoint_state: "LdapCheckpointState"
) -> Iterable[str]:
assert type in self.get_supported_types()
diff = set(self.urns) - set(other_checkpoint_state.urns)
yield from (urn for urn in diff if guess_entity_type(urn) == type)

def get_percent_entities_changed(
self, old_checkpoint_state: "LdapCheckpointState"
) -> float:
return StaleEntityCheckpointStateBase.compute_percent_entities_changed(
[(self.urns, old_checkpoint_state.urns)]
)

This file was deleted.

4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
)
from datahub.ingestion.api.source import Source
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
StaleEntityRemovalSourceReport,
Expand All @@ -46,7 +47,6 @@
StatefulIngestionConfigBase,
StatefulIngestionSourceBase,
)
from datahub.ingestion.source.state.tableau_state import TableauCheckpointState
from datahub.ingestion.source.tableau_common import (
FIELD_TYPE_MAPPING,
MetadataQueryException,
Expand Down Expand Up @@ -316,7 +316,7 @@ def __init__(
self.stale_entity_removal_handler = StaleEntityRemovalHandler(
source=self,
config=self.config,
state_type_class=TableauCheckpointState,
state_type_class=GenericCheckpointState,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from typing import Iterable, List, Set

from datahub.emitter.mce_builder import dataset_key_to_urn, dataset_urn_to_key
from datahub.emitter.mce_builder import (
dataset_key_to_urn,
dataset_urn_to_key,
make_dataset_urn,
)
from datahub.metadata.schema_classes import DatasetKeyClass


Expand Down Expand Up @@ -39,3 +43,8 @@ def get_dataset_urns_not_in(
yield dataset_key_to_urn(
DatasetKeyClass(platform=platform, name=name, origin=env)
)

@staticmethod
def get_urn_from_encoded_topic(encoded_urn: str) -> str:
platform, name, env = encoded_urn.split(CheckpointStateUtil.get_separator())
return make_dataset_urn(platform, name, env)
8 changes: 4 additions & 4 deletions metadata-ingestion/tests/integration/iceberg/test_iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from datahub.ingestion.run.pipeline import Pipeline
from datahub.ingestion.source.iceberg.iceberg import IcebergSource
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.iceberg_state import IcebergCheckpointState
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from tests.test_helpers import mce_helpers
from tests.test_helpers.state_helpers import (
run_and_get_pipeline,
Expand All @@ -24,7 +24,7 @@

def get_current_checkpoint_from_pipeline(
pipeline: Pipeline,
) -> Optional[Checkpoint]:
) -> Optional[Checkpoint[GenericCheckpointState]]:
iceberg_source = cast(IcebergSource, pipeline.source)
return iceberg_source.get_current_checkpoint(
iceberg_source.stale_entity_removal_handler.job_id
Expand Down Expand Up @@ -153,8 +153,8 @@ def test_iceberg_stateful_ingest(pytestconfig, tmp_path, mock_time, mock_datahub

# Perform all assertions on the states. The deleted table should not be
# part of the second state
state1 = cast(IcebergCheckpointState, checkpoint1.state)
state2 = cast(IcebergCheckpointState, checkpoint2.state)
state1 = checkpoint1.state
state2 = checkpoint2.state
difference_urns = list(
state1.get_urns_not_in(type="dataset", other_checkpoint_state=state2)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from datahub.configuration.source_common import DEFAULT_ENV
from datahub.ingestion.run.pipeline import Pipeline, PipelineContext
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.tableau_state import TableauCheckpointState
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from datahub.ingestion.source.tableau import TableauSource
from datahub.ingestion.source.tableau_common import (
TableauLineageOverrides,
Expand Down Expand Up @@ -145,7 +145,7 @@ def tableau_ingest_common(

def get_current_checkpoint_from_pipeline(
pipeline: Pipeline,
) -> Optional[Checkpoint]:
) -> Optional[Checkpoint[GenericCheckpointState]]:
tableau_source = cast(TableauSource, pipeline.source)
return tableau_source.get_current_checkpoint(
tableau_source.stale_entity_removal_handler.job_id
Expand Down Expand Up @@ -320,8 +320,8 @@ def test_tableau_stateful(pytestconfig, tmp_path, mock_time, mock_datahub_graph)

# Perform all assertions on the states. The deleted table should not be
# part of the second state
state1 = cast(TableauCheckpointState, checkpoint1.state)
state2 = cast(TableauCheckpointState, checkpoint2.state)
state1 = checkpoint1.state
state2 = checkpoint2.state

difference_dataset_urns = list(
state1.get_urns_not_in(type="dataset", other_checkpoint_state=state2)
Expand Down
Loading

0 comments on commit c542b61

Please sign in to comment.