Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ingest/stateful): remove IngestionJobStateProvider #6792

Merged
merged 8 commits into from
Dec 19, 2022
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions metadata-ingestion/src/datahub/cli/state_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
from click_default_group import DefaultGroup

from datahub.cli.cli_utils import get_url_and_token
from datahub.ingestion.api.ingestion_job_state_provider import IngestionJobStateProvider
from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import (
IngestionCheckpointingProviderBase,
)
from datahub.ingestion.graph.client import DataHubGraph, DataHubGraphConfig
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
Expand Down Expand Up @@ -49,7 +51,7 @@ def inspect(pipeline_name: str, platform: str, platform_instance: str) -> None:

job_name = StaleEntityRemovalHandler.compute_job_id(platform)

data_job_urn = IngestionJobStateProvider.get_data_job_urn(
data_job_urn = IngestionCheckpointingProviderBase.get_data_job_urn(
DatahubIngestionCheckpointingProvider.orchestrator_name,
pipeline_name,
job_name,
Expand Down
15 changes: 4 additions & 11 deletions metadata-ingestion/src/datahub/ingestion/api/committable.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum, auto
from typing import Generic, List, Optional, TypeVar
from typing import Generic, TypeVar


class CommitPolicy(Enum):
Expand All @@ -14,7 +14,6 @@ class CommitPolicy(Enum):
class Committable(ABC):
name: str
commit_policy: CommitPolicy
committed: bool = False

@abstractmethod
def commit(self) -> None:
Expand All @@ -23,28 +22,22 @@ def commit(self) -> None:

StateKeyType = TypeVar("StateKeyType")
StateType = TypeVar("StateType")
# TODO: Add a better alternative to a string for the filter.
FilterType = TypeVar("FilterType")


class StatefulCommittable(
Committable,
Generic[StateKeyType, StateType, FilterType],
Generic[StateKeyType, StateType],
):
def __init__(
self, name: str, commit_policy: CommitPolicy, state_to_commit: StateType
):
super().__init__(name=name, commit_policy=commit_policy)
self.committed: bool = False
self.state_to_commit: StateType = state_to_commit

def has_successfully_committed(self) -> bool:
return bool(not self.state_to_commit or self.committed)

@abstractmethod
def get_previous_states(
self,
state_key: StateKeyType,
last_only: bool = True,
filter_opt: Optional[FilterType] = None,
) -> List[StateType]:
def get_last_state(self, state_key: StateKeyType) -> StateType:
pass
12 changes: 0 additions & 12 deletions metadata-ingestion/src/datahub/ingestion/api/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ def __init__(
self.pipeline_name = pipeline_name
self.dry_run_mode = dry_run
self.preview_mode = preview_mode
self.reporters: Dict[str, Committable] = {}
self.checkpointers: Dict[str, Committable] = {}
try:
self.graph = DataHubGraph(datahub_api) if datahub_api is not None else None
Expand All @@ -83,16 +82,5 @@ def register_checkpointer(self, committable: Committable) -> None:
)
self.checkpointers[committable.name] = committable

def register_reporter(self, committable: Committable) -> None:
if committable.name in self.reporters:
raise IndexError(
f"Reporting provider {committable.name} already registered."
)
self.reporters[committable.name] = committable

def get_reporters(self) -> Iterable[Committable]:
yield from self.reporters.values()

def get_committables(self) -> Iterable[Tuple[str, Committable]]:
yield from self.reporters.items()
yield from self.checkpointers.items()
Original file line number Diff line number Diff line change
@@ -1,64 +1,73 @@
from abc import abstractmethod
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, NewType, Type, TypeVar

from datahub.ingestion.api.committable import CommitPolicy
import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.committable import CommitPolicy, StatefulCommittable
from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.ingestion_job_state_provider import (
IngestionJobStateProvider,
IngestionJobStateProviderConfig,
JobId,
JobStateFilterType,
JobStateKey,
JobStatesMap,
)
from datahub.metadata.schema_classes import DatahubIngestionCheckpointClass

#
# Common type exports
#
JobId = JobId
JobStateKey = JobStateKey
JobStateFilterType = JobStateFilterType

#
# Checkpoint state specific types
#
JobId = NewType("JobId", str)
CheckpointJobStateType = DatahubIngestionCheckpointClass
CheckpointJobStatesMap = JobStatesMap[CheckpointJobStateType]
CheckpointJobStatesMap = Dict[JobId, CheckpointJobStateType]


@dataclass
class JobStateKey:
pipeline_name: str
platform_instance_id: str
job_names: List[JobId]

class IngestionCheckpointingProviderConfig(IngestionJobStateProviderConfig):

class IngestionCheckpointingProviderConfig(ConfigModel):
pass


_Self = TypeVar("_Self", bound="IngestionCheckpointingProviderBase")


@dataclass()
class IngestionCheckpointingProviderBase(
IngestionJobStateProvider[CheckpointJobStateType]
StatefulCommittable[JobStateKey, CheckpointJobStatesMap]
):
"""
The base class(non-abstract) for all checkpointing state provider implementations.
This class is implemented this way as a concrete class is needed to work with the registry,
but we don't want to implement any of the functionality yet.
The base class for all checkpointing state provider implementations.
"""

def __init__(
self, name: str, commit_policy: CommitPolicy = CommitPolicy.ON_NO_ERRORS
):
super(IngestionCheckpointingProviderBase, self).__init__(name, commit_policy)
# Set the initial state to an empty dict.
super().__init__(name, commit_policy, {})

@classmethod
def create(
cls, config_dict: Dict[str, Any], ctx: PipelineContext, name: str
) -> "IngestionJobStateProvider":
cls: Type[_Self], config_dict: Dict[str, Any], ctx: PipelineContext, name: str
) -> "_Self":
raise NotImplementedError("Sub-classes must override this method.")

def get_previous_states(
@abstractmethod
def get_last_state(
self,
state_key: JobStateKey,
last_only: bool = True,
filter_opt: Optional[JobStateFilterType] = None,
) -> List[CheckpointJobStatesMap]:
raise NotImplementedError("Sub-classes must override this method.")
) -> CheckpointJobStatesMap:
...

@abstractmethod
def commit(self) -> None:
raise NotImplementedError("Sub-classes must override this method.")
...

@staticmethod
def get_data_job_urn(
orchestrator: str,
pipeline_name: str,
job_name: JobId,
platform_instance_id: str,
) -> str:
"""
Standardizes datajob urn minting for all ingestion job state providers.
"""
return builder.make_data_job_urn(
orchestrator, f"{pipeline_name}_{platform_instance_id}", job_name
)

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
platform_name,
support_status,
)
from datahub.ingestion.api.ingestion_job_state_provider import JobId
from datahub.ingestion.api.ingestion_job_checkpointing_provider_base import JobId
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.aws import s3_util
from datahub.ingestion.source.aws.aws_common import AwsSourceConfig
Expand Down
55 changes: 3 additions & 52 deletions metadata-ingestion/src/datahub/ingestion/source/dbt/dbt_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,7 @@
from dataclasses import dataclass, field
from datetime import datetime
from enum import auto
from typing import (
Any,
Callable,
ClassVar,
Dict,
Iterable,
List,
Optional,
Tuple,
Type,
Union,
cast,
)
from typing import Any, Callable, ClassVar, Dict, Iterable, List, Optional, Tuple, Union

import pydantic
from pydantic import root_validator, validator
Expand All @@ -41,7 +29,6 @@
platform_name,
support_status,
)
from datahub.ingestion.api.ingestion_job_state_provider import JobId
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.sql.sql_types import (
BIGQUERY_TYPES_MAP,
Expand All @@ -52,11 +39,7 @@
resolve_postgres_modified_type,
resolve_trino_modified_type,
)
from datahub.ingestion.source.state.checkpoint import Checkpoint
from datahub.ingestion.source.state.dbt_state import DbtCheckpointState
from datahub.ingestion.source.state.sql_common_state import (
BaseSQLAlchemyCheckpointState,
)
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
StaleEntityRemovalSourceReport,
Expand All @@ -65,7 +48,6 @@
from datahub.ingestion.source.state.stateful_ingestion_base import (
StatefulIngestionConfigBase,
StatefulIngestionSourceBase,
StateType,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import (
AuditStamp,
Expand Down Expand Up @@ -684,42 +666,11 @@ def __init__(self, config: DBTCommonConfig, ctx: PipelineContext, platform: str)
self.stale_entity_removal_handler = StaleEntityRemovalHandler(
source=self,
config=self.config,
state_type_class=DbtCheckpointState,
state_type_class=GenericCheckpointState,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)

def get_last_checkpoint(
self, job_id: JobId, checkpoint_state_class: Type[StateType]
) -> Optional[Checkpoint]:
last_checkpoint: Optional[Checkpoint]
is_conversion_required: bool = False
try:
# Best-case that last checkpoint state is DbtCheckpointState
last_checkpoint = super(DBTSourceBase, self).get_last_checkpoint(
job_id, checkpoint_state_class
)
except Exception as e:
# Backward compatibility for old dbt ingestion source which was saving dbt-nodes in
# BaseSQLAlchemyCheckpointState
last_checkpoint = super(DBTSourceBase, self).get_last_checkpoint(
job_id, BaseSQLAlchemyCheckpointState # type: ignore
)
logger.debug(
f"Found BaseSQLAlchemyCheckpointState as checkpoint state (got {e})."
)
is_conversion_required = True

if last_checkpoint is not None and is_conversion_required:
# Map the BaseSQLAlchemyCheckpointState to DbtCheckpointState
dbt_checkpoint_state: DbtCheckpointState = DbtCheckpointState()
dbt_checkpoint_state.urns = (
cast(BaseSQLAlchemyCheckpointState, last_checkpoint.state)
).urns
last_checkpoint.state = dbt_checkpoint_state

return last_checkpoint

def create_test_entity_mcps(
self,
test_nodes: List[DBTNode],
Expand Down
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from datahub.ingestion.api.registry import import_path
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.kafka_schema_registry_base import KafkaSchemaRegistryBase
from datahub.ingestion.source.state.kafka_state import KafkaCheckpointState
from datahub.ingestion.source.state.entity_removal_state import GenericCheckpointState
from datahub.ingestion.source.state.stale_entity_removal_handler import (
StaleEntityRemovalHandler,
StaleEntityRemovalSourceReport,
Expand Down Expand Up @@ -145,7 +145,7 @@ def __init__(self, config: KafkaSourceConfig, ctx: PipelineContext):
self.stale_entity_removal_handler = StaleEntityRemovalHandler(
source=self,
config=self.source_config,
state_type_class=KafkaCheckpointState,
state_type_class=GenericCheckpointState,
pipeline_name=self.ctx.pipeline_name,
run_id=self.ctx.run_id,
)
Expand Down
Loading